http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java deleted file mode 100644 index 1e71129..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/CloudControllerService.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.interfaces; - -import org.apache.stratos.cloud.controller.deployment.partition.Partition; -import org.apache.stratos.cloud.controller.exception.*; -import org.apache.stratos.cloud.controller.pojo.*; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; - -/** - * This API provides a way to communicate with underline - * Infrastructure which are supported by <i>jClouds</i>. - * - */ -public interface CloudControllerService { - - /** - * Deploys a Cartridge configuration - * @param cartridgeConfig cartridge configuration to be deployed - * @throws InvalidCartridgeDefinitionException if the cartridge configuration is not valid. - * @throws InvalidIaasProviderException if the iaas providers configured are not valid. - * @throws IllegalArgumentException if the provided argument is not valid. - */ - void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) - throws InvalidCartridgeDefinitionException, InvalidIaasProviderException; - - /** - * Undeploys a Cartridge configuration which is already deployed. - * @param cartridgeType type of the cartridge to be undeployed. - * @throws InvalidCartridgeTypeException if the cartridge type specified is not a deployed cartridge. - */ - public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException; - - public void deployServiceGroup(ServiceGroup servicegroup) throws InvalidServiceGroupException; - - public void undeployServiceGroup(String name) throws InvalidServiceGroupException; - - public ServiceGroup getServiceGroup (String name) throws InvalidServiceGroupException; - - public String []getServiceGroupSubGroups (String name) throws InvalidServiceGroupException; - - public String [] getServiceGroupCartridges (String name) throws InvalidServiceGroupException; - - public Dependencies getServiceGroupDependencies (String name) throws InvalidServiceGroupException; - - /** - * Validate a given {@link Partition} for basic property existence. - * @param partition partition to be validated. - * @return whether the partition is a valid one. - * @throws InvalidPartitionException if the partition is invalid. - */ - boolean validatePartition(Partition partition) throws InvalidPartitionException; - - /** - * Validate a given deployment policy. - * @param cartridgeType type of the cartridge - * @param partitions partitions - * @return whether the policy is a valid one against the given Cartridge. - * @throws InvalidPartitionException if the policy contains at least one invalid partition. - * @throws InvalidCartridgeTypeException if the given Cartridge type is not a valid one. - */ - boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) - throws InvalidPartitionException, InvalidCartridgeTypeException; - - /** - * <p> - * Registers the details of a newly created service cluster. This will override an already - * present service cluster, if there is any. A service cluster is uniquely identified by its - * domain and sub domain combination. - * </p> - * @param registrant information about the new subscription. - * @return whether the registration is successful or not. - * - * @throws UnregisteredCartridgeException - * when the cartridge type requested by this service is - * not a registered one. - */ - boolean registerService(Registrant registrant) throws UnregisteredCartridgeException; - - /** - * Calling this method will result in an instance startup, which is belong - * to the provided Cluster ID. Also note that the instance that is starting up - * belongs to the group whose name is derived from its Cluster ID, replacing <i>.</i> - * by a hyphen (<i>-</i>). - * @param member Context with cluster id, partition etc. - * @return updated {@link MemberContext} - * @throws UnregisteredCartridgeException if the requested Cartridge type is not a registered one. - * @throws InvalidIaasProviderException if the iaas requested is not valid. - */ - MemberContext startInstance(MemberContext member) throws UnregisteredCartridgeException, InvalidIaasProviderException; - - /** - * Create a container cluster. - * @param {@link ContainerClusterContext} Context with cluster id, and host cluster details. - * @return a list of {@link MemberContext}s correspond to each Pod created. - * @throws UnregisteredCartridgeException if the requested Cartridge type is not a registered one. - */ - MemberContext[] startContainers(ContainerClusterContext clusterContext) throws UnregisteredCartridgeException; - - /** - * Calling this method will result in termination of the instance with given member id in the given Partition. - * - * @param memberId - * member ID of the instance to be terminated. - * @return whether an instance terminated successfully or not. - */ - void terminateInstance(String memberId) throws InvalidMemberException, InvalidCartridgeTypeException; - - /** - * Calling this method will result in termination of all instances belong - * to the provided cluster ID. - * - * @param clusterId - * cluster ID of the instance to be terminated. - * @return whether an instance terminated successfully or not. - */ - void terminateAllInstances(String clusterId) throws InvalidClusterException; - - /** - * Terminate all containers of the given cluster. - * @param clusterId id of the subjected cluster. - * @return terminated {@link MemberContext}s - * @throws InvalidClusterException - */ - MemberContext[] terminateAllContainers(String clusterId) throws InvalidClusterException; - - /** - * Terminate a given member/Kubernetes Pod. - * @param memberId member/Pod id to be terminated. - * @return terminated {@link MemberContext} - * @throws MemberTerminationFailedException - */ - MemberContext terminateContainer(String memberId) throws MemberTerminationFailedException; - - /** - * Update the Kubernetes controller created for the given cluster with the specified number of replicas. - * @param clusterId id of the subjected cluster. - * @param replicas total number of replicas to be set to the controller. - * @return newly created Members if any / terminated {@link MemberContext} in scale down scenario. - * @throws InvalidClusterException - */ - MemberContext[] updateContainers(String clusterId, int replicas) throws UnregisteredCartridgeException; - - /** - * Update the topology with current cluster status. - * @param serviceName id of service which the cluster belongs to. - * @param clusterId id of the subjected cluster. - * @param instanceId id of the cluster instance. - * @param status total number of replicas to be set to the controller. - */ - void updateClusterStatus(String serviceName, String clusterId, String instanceId, ClusterStatus status); - - /** - * Unregister a docker service identified by the given cluster id. - * @param clusterId service cluster id. - * @throws UnregisteredClusterException if the service cluster requested is not a registered one. - */ - void unregisterDockerService(String clusterId) throws UnregisteredClusterException; - - /** - * Unregister the service cluster identified by the given cluster id. - * @param clusterId service cluster id. - * @throws UnregisteredClusterException if the service cluster requested is not a registered one. - */ - void unregisterService(String clusterId) throws UnregisteredClusterException; - - /** - * This method will return the information regarding the given cartridge, if present. - * Else this will return <code>null</code>. - * - * @param cartridgeType - * type of the cartridge. - * @return {@link org.apache.stratos.cloud.controller.pojo.CartridgeInfo} of the given cartridge type or <code>null</code>. - * @throws UnregisteredCartridgeException if there is no registered cartridge with this type. - */ - CartridgeInfo getCartridgeInfo(String cartridgeType) throws UnregisteredCartridgeException; - - /** - * Calling this method will result in returning the types of {@link org.apache.stratos.cloud.controller.pojo.Cartridge}s - * registered in Cloud Controller. - * - * @return String array containing types of registered {@link org.apache.stratos.cloud.controller.pojo.Cartridge}s. - */ - String[] getRegisteredCartridges(); - - /** - * Returns the {@link org.apache.stratos.cloud.controller.pojo.ClusterContext} object associated with the given cluster id, or null if not found - * - * @param clusterId cluster id - * @return {@link org.apache.stratos.cloud.controller.pojo.ClusterContext} object associated with the given cluster id, or null - */ - public ClusterContext getClusterContext (String clusterId); - - /** - * Creates the clusters relevant to an application in the topology model - * - * @param appId application id - * @param appClustersContexts cluster information holder object - * @throws ApplicationClusterRegistrationException if the cluster information are null/empty - */ - public void createApplicationClusters(String appId, ApplicationClusterContextDTO[] appClustersContexts) throws - ApplicationClusterRegistrationException; - - /** - * Creates a cluster instance with the given information - * - * @param serviceType serviceType - * @param clusterId cluster id - * @param alias alias provided in the subscription parameters - * @param instanceId instance id - * @throws ClusterInstanceCreationException if an y error occurs in cluster instance creation - */ - public void createClusterInstance (String serviceType, String clusterId, String alias, String instanceId) throws - ClusterInstanceCreationException; -}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java deleted file mode 100644 index 00766de..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/interfaces/Iaas.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.interfaces; - -import org.jclouds.compute.ComputeService; -import org.jclouds.compute.domain.NodeMetadata; -import org.jclouds.compute.domain.Template; -import org.apache.stratos.cloud.controller.exception.InvalidHostException; -import org.apache.stratos.cloud.controller.exception.InvalidRegionException; -import org.apache.stratos.cloud.controller.exception.InvalidZoneException; -import org.apache.stratos.cloud.controller.pojo.IaasProvider; -import org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidator; - -/** - * All IaaSes that are going to support by Cloud Controller, should extend this abstract class. - */ -public abstract class Iaas { - /** - * Reference to the corresponding {@link IaasProvider} - */ - private IaasProvider iaasProvider; - - public Iaas(IaasProvider iaasProvider) { - this.setIaasProvider(iaasProvider); - } - - public IaasProvider getIaasProvider() { - return iaasProvider; - } - - public void setIaasProvider(IaasProvider iaasProvider) { - this.iaasProvider = iaasProvider; - } - - /** - * This should build the {@link ComputeService} object and the {@link Template} object, - * using the information from {@link IaasProvider} and should set the built - * {@link ComputeService} object in the {@link IaasProvider#setComputeService(ComputeService)} - * and also should set the built {@link Template} object in the - * {@link IaasProvider#setTemplate(Template)}. - */ - public abstract void buildComputeServiceAndTemplate(); - - /** - * This method provides a way to set payload that can be obtained from {@link IaasProvider#getPayload()} - * in the {@link Template} of this IaaS. - */ - public abstract void setDynamicPayload(); - - /** - * This will obtain an IP address from the allocated list and associate that IP with this node. - * @param node Node to be associated with an IP. - * @return associated public IP. - */ - public abstract String associateAddress(NodeMetadata node); - - /** - * This will obtain a predefined IP address and associate that IP with this node, if ip is already in use allocate ip from pool - * (through associateAddress()) - * @param node Node to be associated with an IP. - * @ip preallocated floating Ip - * @return associated public IP. - */ - public abstract String associatePredefinedAddress(NodeMetadata node, String ip); - - /** - * This will deallocate/release the given IP address back to pool. - * @param iaasInfo corresponding {@link IaasProvider} - * @param ip public IP address to be released. - */ - public abstract void releaseAddress(String ip); - - /** - * This method should create a Key Pair corresponds to a given public key in the respective region having the name given. - * Also should override the value of the key pair in the {@link Template} of this IaaS. - * @param region region that the key pair will get created. - * @param keyPairName name of the key pair. NOTE: Jclouds adds a prefix : <code>jclouds#</code> - * @param publicKey public key, from which the key pair will be created. - * @return whether the key pair creation is successful or not. - */ - public abstract boolean createKeyPairFromPublicKey(String region, String keyPairName, String publicKey); - - /** - * Validate a given region name against a particular IaaS. - * If a particular IaaS doesn't have a concept called region, it can simply throw {@link InvalidRegionException}. - * @param region name of the region. - * @return whether the region is valid. - * @throws InvalidRegionException if the region is invalid. - */ - public abstract boolean isValidRegion(String region) throws InvalidRegionException; - - /** - * Validate a given zone name against a particular region in an IaaS. - * If a particular IaaS doesn't have a concept called zone, it can simply throw {@link InvalidZoneException}. - * @param region region of the IaaS that the zone belongs to. - * @param zone - * @return whether the zone is valid in the given region or not. - * @throws InvalidZoneException if the zone is invalid in a given region. - */ - public abstract boolean isValidZone(String region, String zone) throws InvalidZoneException, InvalidRegionException; - - /** - * Validate a given host id against a particular zone in an IaaS. - * If a particular IaaS doesn't have a concept called hosts, it can simply throw {@link InvalidHostException}. - * @param zone zone of the IaaS that the host belongs to. - * @param host - * @return whether the host is valid in the given zone or not. - * @throws InvalidHostException if the host is invalid in a given zone. - */ - public abstract boolean isValidHost(String zone, String host) throws InvalidHostException; - - /** - * provides the {@link PartitionValidator} corresponds to this particular IaaS. - * @return {@link PartitionValidator} - */ - public abstract PartitionValidator getPartitionValidator(); - - /** - * Builds only the jclouds {@link Template} - */ - public abstract void buildTemplate(); - - /** - * Create a new volume in the respective Iaas. - * @param sizeGB size of the volume in Giga Bytes. - * @return Id of the created volume. - */ - public abstract String createVolume(int sizeGB, String snapshotId); - - /** - * Attach a given volume to an instance at the specified device path. - * @param instanceId of the instance. - * @param volumeId volume id of the volume to be attached. - * @param deviceName name of the device that the volume would bind to. - * @return the status of the attachment. - */ - public abstract String attachVolume(String instanceId, String volumeId, String deviceName); - - /** - * Detach a given volume from the given instance. - * @param instanceId of the instance. - * @param volumeId volume id of the volume to be detached. - */ - public abstract void detachVolume(String instanceId, String volumeId); - - /** - * Delete a given volume. - * @param volumeId volume id of the volume to be detached. - */ - public abstract void deleteVolume(String volumeId); - - /** - * This returns the device of the volume specified by the user. This is depends on IAAS. - * For an instance /dev/sdf maps to /dev/xvdf in EC2. - */ - public abstract String getIaasDevice(String device); -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index b07a875..56f9d26 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -23,14 +23,13 @@ package org.apache.stratos.cloud.controller.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.receiver.application.ApplicationTopicReceiver; -import org.apache.stratos.cloud.controller.receiver.cluster.status.ClusterStatusTopicReceiver; +import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver; +import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver; import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl; -import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; -import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler; -import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver; -import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; +import org.apache.stratos.cloud.controller.services.CloudControllerServiceImpl; +import org.apache.stratos.cloud.controller.services.CloudControllerService; +import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler; +import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.util.Util; import org.osgi.framework.BundleContext; http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java new file mode 100644 index 0000000..780722d --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/ServiceReferenceHolder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.internal; + +import org.apache.axis2.engine.AxisConfiguration; +import org.wso2.carbon.caching.impl.DistributedMapProvider; +import org.wso2.carbon.ntask.core.service.TaskService; +import org.wso2.carbon.registry.core.Registry; +import org.wso2.carbon.registry.core.session.UserRegistry; + +/** + * Singleton class to hold all the service references. + */ +public class ServiceReferenceHolder { + + private static ServiceReferenceHolder instance; + private TaskService taskService; + private Registry registry; + private AxisConfiguration axisConfiguration; + private DistributedMapProvider distributedMapProvider; + + private ServiceReferenceHolder() { + } + + public static ServiceReferenceHolder getInstance() { + if (instance == null) { + instance = new ServiceReferenceHolder(); + } + return instance; + } + + public void setAxisConfiguration(AxisConfiguration axisConfiguration) { + this.axisConfiguration = axisConfiguration; + } + + public AxisConfiguration getAxisConfiguration() { + return axisConfiguration; + } + + public TaskService getTaskService() { + return taskService; + } + + public void setTaskService(TaskService taskService) { + this.taskService = taskService; + } + + public void setRegistry(UserRegistry governanceSystemRegistry) { + registry = governanceSystemRegistry; + } + + public Registry getRegistry() { + return registry; + } + + public void setDistributedMapProvider(DistributedMapProvider distributedMapProvider) { + this.distributedMapProvider = distributedMapProvider; + } + + public DistributedMapProvider getDistributedMapProvider() { + return distributedMapProvider; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java deleted file mode 100644 index d8bda9e..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/jcloud/ComputeServiceBuilderUtil.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.jcloud; - -import com.google.common.collect.ImmutableSet; -import com.google.inject.Module; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.pojo.IaasProvider; -import org.apache.stratos.cloud.controller.util.CloudControllerConstants; -import org.jclouds.ContextBuilder; -import org.jclouds.compute.ComputeServiceContext; -import org.jclouds.enterprise.config.EnterpriseConfigurationModule; -import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; -import org.jclouds.sshj.config.SshjSshClientModule; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Properties; - -/** - * This class is responsible for creating a JClouds specific ComputeService object. - */ -public class ComputeServiceBuilderUtil { - - private static final Log log = LogFactory.getLog(ComputeServiceBuilderUtil.class); - - public static byte[] getUserData(String payloadFileName) { - byte[] bytes = null; - try { - File file = new File(payloadFileName); - if (!file.exists()) { - handleException("Payload file " + payloadFileName + " does not exist"); - } - if (!file.canRead()) { - handleException("Payload file " + payloadFileName + " does cannot be read"); - } - bytes = getBytesFromFile(file); - - } catch (IOException e) { - handleException("Cannot read data from payload file " + payloadFileName, e); - } - return bytes; - } - - - public static void buildDefaultComputeService(IaasProvider iaas) { - - Properties properties = new Properties(); - - // load properties - for (Map.Entry<String, String> entry : iaas.getProperties().entrySet()) { - properties.put(entry.getKey(), entry.getValue()); - } - - // set modules - Iterable<Module> modules = - ImmutableSet.<Module> of(new SshjSshClientModule(), new SLF4JLoggingModule(), - new EnterpriseConfigurationModule()); - - // build context - ContextBuilder builder = - ContextBuilder.newBuilder(iaas.getProvider()) - .credentials(iaas.getIdentity(), iaas.getCredential()).modules(modules) - .overrides(properties); - - // set the compute service object - iaas.setComputeService(builder.buildView(ComputeServiceContext.class).getComputeService()); - } - - public static String extractRegion(IaasProvider iaas) { - String region; - // try to find region - if ((region = iaas.getProperty(CloudControllerConstants.REGION_PROPERTY)) == null) { - // if the property, isn't specified, try to obtain from the image id - // image id can be in following format - {region}/{UUID} - region = iaas.getImage().contains("/") ? iaas.getImage().split("/")[0] : null; - } - - return region; - } - - public static String extractZone(IaasProvider iaas) { - - return iaas.getProperty(CloudControllerConstants.AVAILABILITY_ZONE); - } - - /** Returns the contents of the file in a byte array - * - * @param file - * - Input File - * @return Bytes from the file - * @throws java.io.IOException - * , if retrieving the file contents failed. - */ - public static byte[] getBytesFromFile(File file) throws IOException { - if (!file.exists()) { - log.error("Payload file " + file.getAbsolutePath() + " does not exist"); - return null; - } - InputStream is = new FileInputStream(file); - byte[] bytes; - - try { - // Get the size of the file - long length = file.length(); - - // You cannot create an array using a long type. - // It needs to be an int type. - // Before converting to an int type, check - // to ensure that file is not larger than Integer.MAX_VALUE. - if (length > Integer.MAX_VALUE) { - if (log.isDebugEnabled()) { - log.debug("File is too large"); - } - } - - // Create the byte array to hold the data - bytes = new byte[(int) length]; - - // Read in the bytes - int offset = 0; - int numRead; - while (offset < bytes.length && - (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) { - offset += numRead; - } - - // Ensure all the bytes have been read in - if (offset < bytes.length) { - throw new IOException("Could not completely read file " + file.getName()); - } - } finally { - // Close the input stream and return bytes - is.close(); - } - - return bytes; - } - - /** - * handles the exception - * - * @param msg - * exception message - */ - private static void handleException(String msg) { - log.error(msg); - throw new CloudControllerException(msg); - } - - /** - * handles the exception - * - * @param msg - * exception message - * @param e - * exception - */ - private static void handleException(String msg, Exception e) { - log.error(msg, e); - throw new CloudControllerException(msg, e); - - } - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java new file mode 100644 index 0000000..6ad0258 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/CartridgeInstanceDataPublisher.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.exception.CloudControllerException; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.domain.MemberContext; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.jclouds.compute.domain.NodeMetadata; +import org.wso2.carbon.base.ServerConfiguration; +import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.utils.CarbonUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * This will publish the state changes of a node in the topology to a data receiver + */ +public class CartridgeInstanceDataPublisher { + + private static final Log log = LogFactory.getLog(CartridgeInstanceDataPublisher.class); + private static AsyncDataPublisher dataPublisher; + private static StreamDefinition streamDefinition; + private static final String cloudControllerEventStreamVersion = "1.0.0"; + + public static void publish(String memberId, + String partitionId, + String networkId, + String clusterId, + String serviceName, + String status, + NodeMetadata metadata) { + if(!FasterLookUpDataHolder.getInstance().getEnableBAMDataPublisher()){ + return; + } + log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle started."); + + if(dataPublisher==null){ + createDataPublisher(); + + //If we cannot create a data publisher we should give up + //this means data will not be published + if(dataPublisher == null){ + log.error("Data Publisher cannot be created or found."); + release(); + return; + } + } + + + MemberContext memberContext = FasterLookUpDataHolder.getInstance().getMemberContextOfMemberId(memberId); + String cartridgeType = memberContext.getCartridgeType(); + Cartridge cartridge = FasterLookUpDataHolder.getInstance().getCartridge(cartridgeType); + + //Construct the data to be published + List<Object> payload = new ArrayList<Object>(); + // Payload values + payload.add(memberId); + payload.add(serviceName); + payload.add(clusterId); + payload.add(handleNull(memberContext.getLbClusterId())); + payload.add(handleNull(partitionId)); + payload.add(handleNull(networkId)); + if (cartridge != null) { + payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); + } else { + payload.add(""); + } + payload.add(handleNull(memberContext.getPartition().getProvider())); + payload.add(handleNull(status)); + + if(metadata != null) { + payload.add(metadata.getHostname()); + payload.add(metadata.getHardware().getHypervisor()); + payload.add(String.valueOf(metadata.getHardware().getRam())); + payload.add(metadata.getImageId()); + payload.add(metadata.getLoginPort()); + payload.add(metadata.getOperatingSystem().getName()); + payload.add(metadata.getOperatingSystem().getVersion()); + payload.add(metadata.getOperatingSystem().getArch()); + payload.add(String.valueOf(metadata.getOperatingSystem().is64Bit())); + } else { + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(0); + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(""); + } + + payload.add(handleNull(memberContext.getPrivateIpAddress())); + payload.add(handleNull(memberContext.getPublicIpAddress())); + payload.add(handleNull(memberContext.getAllocatedIpAddress())); + + Event event = new Event(); + event.setPayloadData(payload.toArray()); + event.setArbitraryDataMap(new HashMap<String, String>()); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); + } + dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); + } catch (AgentException e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); + } + } + } + + private static void release(){ + FasterLookUpDataHolder.getInstance().setPublisherRunning(false); + } + + private static StreamDefinition initializeStream() throws Exception { + streamDefinition = new StreamDefinition( + CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM, + cloudControllerEventStreamVersion); + streamDefinition.setNickName("cloud.controller"); + streamDefinition.setDescription("Instances booted up by the Cloud Controller"); + // Payload definition + List<Attribute> payloadData = new ArrayList<Attribute>(); + payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT)); + payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } + + + private static void createDataPublisher(){ + //creating the agent + + ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration(); + String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location"); + String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password"); + String bamServerUrl = serverConfig.getFirstProperty("BamServerURL"); + String adminUsername = FasterLookUpDataHolder.getInstance().getDataPubConfig().getBamUsername(); + String adminPassword = FasterLookUpDataHolder.getInstance().getDataPubConfig().getBamPassword(); + + System.setProperty("javax.net.ssl.trustStore", trustStorePath); + System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword); + + + try { + dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword); + FasterLookUpDataHolder.getInstance().setDataPublisher(dataPublisher); + initializeStream(); + dataPublisher.addStreamDefinition(streamDefinition); + } catch (Exception e) { + String msg = "Unable to create a data publisher to " + bamServerUrl + + ". Usage Agent will not function properly. "; + log.error(msg, e); + throw new CloudControllerException(msg, e); + } + } + + private static String handleNull(String val) { + if (val == null) { + return ""; + } + return val; + } + + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java new file mode 100644 index 0000000..c3f6da8 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/TopologySynchronizerTaskScheduler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.messaging.publisher; + +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.messaging.topology.TopologySynchronizerTask; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.wso2.carbon.ntask.common.TaskException; +import org.wso2.carbon.ntask.core.TaskInfo; +import org.wso2.carbon.ntask.core.TaskManager; +import org.wso2.carbon.ntask.core.service.TaskService; + +/** + * Topology synchronizer task scheduler for scheduling the topology synchronizer task + * using carbon task service. + */ +public class TopologySynchronizerTaskScheduler { + + private static final Log log = LogFactory.getLog(TopologySynchronizerTaskScheduler.class); + + private static final FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); + + public static void schedule(TaskService taskService) { + TaskManager taskManager = null; + try { + + if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) { + // Register task type + taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); + + // Register task + taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); + String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY); + String cron = cronProp != null ? cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ; + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron); + TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, + TopologySynchronizerTask.class.getName(), + new HashMap<String, String>(), triggerInfo); + taskManager.registerTask(taskInfo); + if(log.isDebugEnabled()) { + log.debug(String.format("Topology synchronization task scheduled: %s", CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME)); + } + } + + } catch (Exception e) { + if (taskManager != null) { + try { + taskManager.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME); + } catch (TaskException te) { + if (log.isErrorEnabled()) { + log.error(te); + } + } + } + + String msg = String.format("Could not schedule topology synchronization task: %s", CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME); + log.error(msg, e); + throw new RuntimeException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java new file mode 100644 index 0000000..3e045a9 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.receiver.application; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver; + +/** + * This is to receive the application topic messages. + */ +public class ApplicationTopicReceiver implements Runnable{ + private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class); + private ApplicationsEventReceiver applicationsEventReceiver; + private boolean terminated; + + public ApplicationTopicReceiver() { + this.applicationsEventReceiver = new ApplicationsEventReceiver(); + addEventListeners(); + + } + + + @Override + public void run() { + + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread started"); + } + Thread thread = new Thread(applicationsEventReceiver); + thread.start(); + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread terminated"); + } + + } + private void addEventListeners() { + applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + //Remove the application related data + ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent)event; + log.info("ApplicationTerminatedEvent received for [application] " + terminatedEvent.getAppId()); + String appId = terminatedEvent.getAppId(); + TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData()); + } + }); + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java new file mode 100644 index 0000000..b4ff9f2 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.receiver.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.cluster.status.*; +import org.apache.stratos.messaging.listener.cluster.status.*; +import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; + +public class ClusterStatusTopicReceiver implements Runnable{ + private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class); + + private ClusterStatusEventReceiver statusEventReceiver; + private boolean terminated; + + public ClusterStatusTopicReceiver() { + this.statusEventReceiver = new ClusterStatusEventReceiver(); + addEventListeners(); + } + + public void run() { + Thread thread = new Thread(statusEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("Cloud controller Cluster status thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread terminated"); + } + + } + private void addEventListeners() { + // Listen to topology events that affect clusters + statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() { + @Override + protected void onEvent(Event event) { + //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event); + } + }); + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java new file mode 100644 index 0000000..fd24c60 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.receiver.instance.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.listener.instance.status.InstanceActivatedEventListener; +import org.apache.stratos.messaging.listener.instance.status.InstanceMaintenanceListener; +import org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShutdownEventListener; +import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; +import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; + +/** + * This will handle the instance status events + */ +public class InstanceStatusTopicReceiver implements Runnable { + private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class); + + private InstanceStatusEventReceiver statusEventReceiver; + private boolean terminated; + + public InstanceStatusTopicReceiver() { + this.statusEventReceiver = new InstanceStatusEventReceiver(); + addEventListeners(); + } + + + @Override + public void run() { + Thread thread = new Thread(statusEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread started"); + } + ///* Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread terminated"); + } + } + + private void addEventListeners() { + statusEventReceiver.addEventListener(new InstanceActivatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new InstanceStartedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new InstanceReadyToShutdownEventListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event); + } catch (Exception e) { + String error = "Failed to retrieve the instance status event message"; + log.error(error, e); + } + } + }); + + statusEventReceiver.addEventListener(new InstanceMaintenanceListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event); + } catch (Exception e) { + String error = "Failed to retrieve the instance status event message"; + log.error(error, e); + } + } + }); + + + } + +}
