http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java new file mode 100644 index 0000000..e4079d0 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -0,0 +1,962 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.topology; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.domain.*; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.exception.CloudControllerException; +import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException; +import org.apache.stratos.cloud.controller.exception.InvalidMemberException; +import org.apache.stratos.cloud.controller.messaging.publisher.CartridgeInstanceDataPublisher; +import org.apache.stratos.cloud.controller.registry.RegistryManager; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.instance.ClusterInstance; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.event.cluster.status.*; +import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient; +import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient; +import org.wso2.carbon.registry.core.exceptions.RegistryException; + +import java.util.*; + +/** + * this is to manipulate the received events by cloud controller + * and build the complete topology with the events received + */ +public class TopologyBuilder { + private static final Log log = LogFactory.getLog(TopologyBuilder.class); + + + public static void handleServiceCreated(List<Cartridge> cartridgeList) { + Service service; + Topology topology = TopologyManager.getTopology(); + if (cartridgeList == null) { + log.warn(String.format("Cartridge list is empty")); + return; + } + + try { + + TopologyManager.acquireWriteLock(); + for (Cartridge cartridge : cartridgeList) { + if (!topology.serviceExists(cartridge.getType())) { + service = new Service(cartridge.getType(), cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant); + List<PortMapping> portMappings = cartridge.getPortMappings(); + Properties properties = new Properties(); + for (Map.Entry<String, String> entry : cartridge.getProperties().entrySet()) { + properties.setProperty(entry.getKey(), entry.getValue()); + } + service.setProperties(properties); + Port port; + //adding ports to the event + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + service.addPort(port); + } + topology.addService(service); + TopologyManager.updateTopology(topology); + } + } + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendServiceCreateEvent(cartridgeList); + + } + + public static void handleServiceRemoved(List<Cartridge> cartridgeList) { + Topology topology = TopologyManager.getTopology(); + + for (Cartridge cartridge : cartridgeList) { + if (topology.getService(cartridge.getType()).getClusters().size() == 0) { + if (topology.serviceExists(cartridge.getType())) { + try { + TopologyManager.acquireWriteLock(); + topology.removeService(cartridge.getType()); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); + } else { + log.warn(String.format("Service %s does not exist..", cartridge.getType())); + } + } else { + log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() + + " from the topology"); + } + } + } + + public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) { + TopologyManager.acquireWriteLock(); + Cluster cluster; + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + if (service.clusterExists(event.getClusterId())) { + log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); + return; + } else { + cluster = new Cluster(event.getServiceName(), + event.getClusterId(), event.getDeploymentPolicyName(), + event.getAutosScalePolicyName(), event.getAppId()); + //cluster.setStatus(Status.Created); + cluster.setHostNames(event.getHostNames()); + cluster.setTenantRange(event.getTenantRange()); + service.addCluster(cluster); + TopologyManager.updateTopology(topology); + } + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendClusterCreatedEvent(cluster); + } + + public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + + for (Cluster cluster : appClusters) { + Service service = topology.getService(cluster.getServiceName()); + if (service == null) { + log.error("Service " + cluster.getServiceName() + + " not found in Topology, unable to create Application cluster"); + } else { + service.addCluster(cluster); + log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology"); + } + } + + TopologyManager.updateTopology(topology); + + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters); + + } + + public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) { + TopologyManager.acquireWriteLock(); + + List<Cluster> removedClusters = new ArrayList<Cluster>(); + FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); + try { + Topology topology = TopologyManager.getTopology(); + + if (clusterData != null) { + // remove clusters from CC topology model and remove runtime information + for (ClusterDataHolder aClusterData : clusterData) { + Service aService = topology.getService(aClusterData.getServiceType()); + if (aService != null) { + removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); + } else { + log.warn("Service " + aClusterData.getServiceType() + " not found, unable to remove Cluster " + aClusterData.getClusterId()); + } + // remove runtime data + dataHolder.removeClusterContext(aClusterData.getClusterId()); + + log.info("Removed application [ " + appId + " ]'s Cluster [ " + aClusterData.getClusterId() + " ] from the topology"); + } + // persist runtime data changes + persist(dataHolder); + } else { + log.info("No cluster data found for application " + appId + " to remove"); + } + + TopologyManager.updateTopology(topology); + + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); + + } + + /** + * Persist data in registry. + */ + private static void persist(FasterLookUpDataHolder dataHolder) { + try { + RegistryManager.getInstance().persist( + dataHolder); + } catch (RegistryException e) { + + String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; + log.fatal(msg); + throw new CloudControllerException(msg, e); + } + } + + public static void handleClusterReset(ClusterStatusClusterResetEvent event) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(event.getServiceName()); + if (service == null) { + log.error("Service " + event.getServiceName() + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + Cluster cluster = service.getCluster(event.getClusterId()); + if (cluster == null) { + log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + + "status to Created"); + return; + } + + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Created; + if(context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Created adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } + + } finally { + TopologyManager.releaseWriteLock(); + } + + + } + + public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, String instanceId) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceType); + if (service == null) { + log.error("Service " + serviceType + + " not found in Topology, unable to update the cluster status to Created"); + return; + } + + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.error("Cluster " + clusterId + " not found in Topology, unable to update " + + "status to Created"); + return; + } + + if(cluster.getInstanceContexts(instanceId) != null) { + log.warn("The Instance context for the cluster already exists for [cluster] " + + clusterId + " [instance-id] " + instanceId); + return; + } + + //context.setStatus(ClusterStatus.Created); + cluster.addInstanceContext(instanceId, new ClusterInstance(alias, clusterId, instanceId)); + TopologyManager.updateTopology(topology); + + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = + new ClusterInstanceCreatedEvent(alias, serviceType, clusterId, instanceId); + TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); + + } finally { + TopologyManager.releaseWriteLock(); + } + } + + + + public static void handleClusterCreated(Registrant registrant, boolean isLb) { + /*Topology topology = TopologyManager.getTopology(); + Service service; + try { + TopologyManager.acquireWriteLock(); + String cartridgeType = registrant.getCartridgeType(); + service = topology.getService(cartridgeType); + Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); + + Cluster cluster; + String clusterId = registrant.getClusterId(); + if (service.clusterExists(clusterId)) { + // update the cluster + cluster = service.getCluster(clusterId); + cluster.addHostName(registrant.getHostName()); + if (service.getServiceType() == ServiceType.MultiTenant) { + cluster.setTenantRange(registrant.getTenantRange()); + } + if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { + props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); + } + cluster.setProperties(props); + cluster.setLbCluster(isLb); + } else { + cluster = new Cluster(cartridgeType, clusterId, + registrant.getDeploymentPolicyName(), registrant.getAutoScalerPolicyName(), null); + cluster.addHostName(registrant.getHostName()); + if (service.getServiceType() == ServiceType.MultiTenant) { + cluster.setTenantRange(registrant.getTenantRange()); + } + if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { + props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); + } + cluster.setProperties(props); + cluster.setLbCluster(isLb); + //cluster.setStatus(Status.Created); + service.addCluster(cluster); + } + TopologyManager.updateTopology(topology); + TopologyEventPublisher.sendClusterCreatedEvent(cartridgeType, clusterId, cluster); + + } finally { + TopologyManager.releaseWriteLock(); + }*/ + } + + + private static void setKubernetesCluster(Cluster cluster) { + boolean isKubernetesCluster = (cluster.getProperties().getProperty(StratosConstants.KUBERNETES_CLUSTER_ID) != null); + if (log.isDebugEnabled()) { + log.debug(" Kubernetes Cluster ["+ isKubernetesCluster + "] "); + } + cluster.setKubernetesCluster(isKubernetesCluster); + } + + public static void handleClusterRemoved(ClusterContext ctxt) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(ctxt.getCartridgeType()); + String deploymentPolicy; + if (service == null) { + log.warn(String.format("Service %s does not exist", + ctxt.getCartridgeType())); + return; + } + + if (!service.clusterExists(ctxt.getClusterId())) { + log.warn(String.format("Cluster %s does not exist for service %s", + ctxt.getClusterId(), + ctxt.getCartridgeType())); + return; + } + + try { + TopologyManager.acquireWriteLock(); + Cluster cluster = service.removeCluster(ctxt.getClusterId()); + deploymentPolicy = cluster.getDeploymentPolicyName(); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); + } + + public static void handleMemberSpawned(String serviceName, + String clusterId, String partitionId, + String privateIp, String publicIp, MemberContext context) { + // adding the new member to the cluster after it is successfully started + // in IaaS. + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Cluster cluster = service.getCluster(clusterId); + String memberId = context.getMemberId(); + String networkPartitionId = context.getNetworkPartitionId(); + String lbClusterId = context.getLbClusterId(); + long initTime = context.getInitTime(); + + if (cluster.memberExists(memberId)) { + log.warn(String.format("Member %s already exists", memberId)); + return; + } + + try { + TopologyManager.acquireWriteLock(); + Member member = new Member(serviceName, clusterId, + networkPartitionId, partitionId, memberId, initTime); + member.setStatus(MemberStatus.Created); + member.setInstanceId(context.getInstanceId()); + member.setMemberIp(privateIp); + member.setLbClusterId(lbClusterId); + member.setMemberPublicIp(publicIp); + member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties())); + try { + + Cartridge cartridge = FasterLookUpDataHolder.getInstance().getCartridge(serviceName); + List<PortMapping> portMappings = cartridge.getPortMappings(); + Port port; + if(cluster.isKubernetesCluster()){ + // Update port mappings with generated service proxy port + // TODO: Need to properly fix with the latest Kubernetes version + String serviceHostPortStr = CloudControllerUtil.getProperty(context.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + if(StringUtils.isEmpty(serviceHostPortStr)) { + log.warn("Kubernetes service host port not found for member: [member-id] " + memberId); + } + // Adding ports to the member + if (StringUtils.isNotEmpty(serviceHostPortStr)) { + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(serviceHostPortStr), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + } + } + + } else { + + // Adding ports to the member + for (PortMapping portMapping : portMappings) { + + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + + } + } + + } catch (Exception e) { + log.error("Could not update member port-map: [member-id] " + memberId, e); + } + cluster.addMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId, + networkPartitionId, partitionId, memberId, lbClusterId, + publicIp, privateIp, context); + } + + public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceStartedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceStartedEvent.getServiceName())); + return; + } + if (!service.clusterExists(instanceStartedEvent.getClusterId())) { + log.warn(String.format("Cluster %s does not exist in service %s", + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName())); + return; + } + + Member member = service.getCluster(instanceStartedEvent.getClusterId()). + getMember(instanceStartedEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceStartedEvent.getMemberId())); + return; + } + + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Starting)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Starting); + } + member.setStatus(MemberStatus.Starting); + log.info("member started event adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //memberStartedEvent. + TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); + //publishing data + CartridgeInstanceDataPublisher.publish(instanceStartedEvent.getMemberId(), + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), + null); + } + + public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceActivatedEvent.getServiceName()); + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceActivatedEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceActivatedEvent.getClusterId())); + return; + } + + Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); + + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceActivatedEvent.getMemberId())); + return; + } + + MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( + instanceActivatedEvent.getServiceName(), + instanceActivatedEvent.getClusterId(), + instanceActivatedEvent.getNetworkPartitionId(), + instanceActivatedEvent.getPartitionId(), + instanceActivatedEvent.getMemberId(), + instanceActivatedEvent.getInstanceId()); + + // grouping - set grouid + //TODO + memberActivatedEvent.setApplicationId(null); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.Activated)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Activated); + } + member.setStatus(MemberStatus.Activated); + log.info("member started event adding status activated"); + Cartridge cartridge = FasterLookUpDataHolder.getInstance(). + getCartridge(instanceActivatedEvent.getServiceName()); + + List<PortMapping> portMappings = cartridge.getPortMappings(); + Port port; + //adding ports to the event + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + memberActivatedEvent.addPort(port); + } + + memberActivatedEvent.setMemberIp(member.getMemberIp()); + memberActivatedEvent.setMemberPublicIp(member.getMemberPublicIp()); + TopologyManager.updateTopology(topology); + + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); + //publishing data + CartridgeInstanceDataPublisher.publish(memberActivatedEvent.getMemberId(), + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getServiceName(), + MemberStatus.Activated.toString(), + null); + } + + public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceReadyToShutdownEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceReadyToShutdownEvent.getClusterId())); + return; + } + + + Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceReadyToShutdownEvent.getMemberId())); + return; + } + MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( + instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + + if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.ReadyToShutDown); + } + member.setStatus(MemberStatus.ReadyToShutDown); + log.info("Member Ready to shut down event adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + //publishing data + CartridgeInstanceDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), + null); + //termination of particular instance will be handled by autoscaler + } + + public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); + //update the status of the member + if (service == null) { + log.warn(String.format("Service %s does not exist", + instanceMaintenanceModeEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + instanceMaintenanceModeEvent.getClusterId())); + return; + } + + Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); + if (member == null) { + log.warn(String.format("Member %s does not exist", + instanceMaintenanceModeEvent.getMemberId())); + return; + } + + + MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( + instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId(), + instanceMaintenanceModeEvent.getMemberId(), + instanceMaintenanceModeEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + // try update lifecycle state + if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { + log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.In_Maintenance); + } + member.setStatus(MemberStatus.In_Maintenance); + log.info("member maintenance mode event adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); + + } + + public static void handleMemberTerminated(String serviceName, String clusterId, + String networkPartitionId, String partitionId, + String memberId) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Properties properties; + if (service == null) { + log.warn(String.format("Service %s does not exist", + serviceName)); + return; + } + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterId)); + return; + } + + Member member = cluster.getMember(memberId); + String instanceId = member.getInstanceId(); + + if (member == null) { + log.warn(String.format("Member with member id %s does not exist", + memberId)); + return; + } + + try { + TopologyManager.acquireWriteLock(); + properties = member.getProperties(); + cluster.removeMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + /* @TODO leftover from grouping_poc*/ + String groupAlias = null; + TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, networkPartitionId, + partitionId, memberId, properties, groupAlias, instanceId); + } + + public static void handleMemberSuspended() { + //TODO + try { + TopologyManager.acquireWriteLock(); + } finally { + TopologyManager.releaseWriteLock(); + } + } + + public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterActivatedEvent) { + + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterActivatedEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + clusterActivatedEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(clusterActivatedEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterActivatedEvent.getClusterId())); + return; + } + + org.apache.stratos.messaging.event.topology.ClusterActivatedEvent clusterActivatedEvent1 = + new org.apache.stratos.messaging.event.topology.ClusterActivatedEvent( + clusterActivatedEvent.getAppId(), + clusterActivatedEvent.getServiceName(), + clusterActivatedEvent.getClusterId(), + clusterActivatedEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + ClusterInstance context = cluster.getInstanceContexts(clusterActivatedEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + clusterActivatedEvent.getClusterId() + " [instance-id] " + + clusterActivatedEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Active; + if(context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster activated adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterActivatedEvent(clusterActivatedEvent1); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterActivatedEvent.getClusterId(), clusterActivatedEvent.getInstanceId(), + context.getStatus(), status)); + } + } finally { + TopologyManager.releaseWriteLock(); + } + + } + + public static void handleClusterInActivateEvent( + ClusterStatusClusterInactivateEvent clusterInActivateEvent) { + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(clusterInActivateEvent.getServiceName()); + //update the status of the cluster + if (service == null) { + log.warn(String.format("Service %s does not exist", + clusterInActivateEvent.getServiceName())); + return; + } + + Cluster cluster = service.getCluster(clusterInActivateEvent.getClusterId()); + if (cluster == null) { + log.warn(String.format("Cluster %s does not exist", + clusterInActivateEvent.getClusterId())); + return; + } + + ClusterInactivateEvent clusterInActivatedEvent1 = + new ClusterInactivateEvent( + clusterInActivateEvent.getAppId(), + clusterInActivateEvent.getServiceName(), + clusterInActivateEvent.getClusterId(), + clusterInActivateEvent.getInstanceId()); + try { + TopologyManager.acquireWriteLock(); + ClusterInstance context = cluster.getInstanceContexts(clusterInActivateEvent.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + clusterInActivateEvent.getClusterId() + " [instance-id] " + + clusterInActivateEvent.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Inactive; + if(context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster InActive adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + TopologyEventPublisher.sendClusterInactivateEvent(clusterInActivatedEvent1); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterInActivateEvent.getClusterId(), clusterInActivateEvent.getInstanceId(), + context.getStatus(), status)); + } + } finally { + TopologyManager.releaseWriteLock(); + } + } + + + private static void deleteAppResourcesFromMetadataService(ApplicationTerminatedEvent event) { + try { + MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); + metadataClient.deleteApplicationProperties(event.getAppId()); + } catch (Exception e) { + log.error("Error occurred while deleting the application resources frm metadata service ", e); + } + } + + public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Cluster cluster = topology.getService(event.getServiceName()). + getCluster(event.getClusterId()); + + if (!cluster.isStateTransitionValid(ClusterStatus.Terminated, null)) { + log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " + + ClusterStatus.Terminated); + } + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminated; + if(context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminated adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterTerminatedEvent clusterTerminatedEvent = new ClusterTerminatedEvent(event.getAppId(), + event.getServiceName(), event.getClusterId(), event.getInstanceId()); + + TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } + } finally { + TopologyManager.releaseWriteLock(); + } + + + } + + public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { + + TopologyManager.acquireWriteLock(); + + try { + Topology topology = TopologyManager.getTopology(); + Cluster cluster = topology.getService(event.getServiceName()). + getCluster(event.getClusterId()); + + if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, null)) { + log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " + + ClusterStatus.Terminating); + } + ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); + if (context == null) { + log.warn("Cluster Instance Context is not found for [cluster] " + + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); + return; + } + ClusterStatus status = ClusterStatus.Terminating; + if(context.isStateTransitionValid(status)) { + context.setStatus(status); + log.info("Cluster Terminating adding status started for" + cluster.getClusterId()); + TopologyManager.updateTopology(topology); + //publishing data + ClusterTerminatingEvent clusterTerminaingEvent = new ClusterTerminatingEvent(event.getAppId(), + event.getServiceName(), event.getClusterId(), null); + + TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); + } else { + log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); + } + } finally { + TopologyManager.releaseWriteLock(); + } + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java new file mode 100644 index 0000000..7000c16 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyEventPublisher.java @@ -0,0 +1,320 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.stratos.cloud.controller.messaging.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.domain.ClusterContext; +import org.apache.stratos.cloud.controller.domain.MemberContext; +import org.apache.stratos.cloud.controller.domain.PortMapping; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Port; +import org.apache.stratos.messaging.domain.topology.ServiceType; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.util.Util; + +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * this is to send the relevant events from cloud controller to topology topic + */ +public class TopologyEventPublisher { + private static final Log log = LogFactory.getLog(TopologyEventPublisher.class); + + public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) { + ServiceCreatedEvent serviceCreatedEvent; + for (Cartridge cartridge : cartridgeList) { + serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(), + (cartridge.isMultiTenant() ? ServiceType.MultiTenant + : ServiceType.SingleTenant)); + + // Add ports to the event + Port port; + List<PortMapping> portMappings = cartridge.getPortMappings(); + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + serviceCreatedEvent.addPort(port); + } + + if (log.isInfoEnabled()) { + log.info(String.format( + "Publishing service created event: [service] %s", + cartridge.getType())); + } + publishEvent(serviceCreatedEvent); + } + } + + public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) { + ServiceRemovedEvent serviceRemovedEvent; + for (Cartridge cartridge : cartridgeList) { + serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType()); + if (log.isInfoEnabled()) { + log.info(String.format( + "Publishing service removed event: [service] %s", + serviceRemovedEvent.getServiceName())); + } + publishEvent(serviceRemovedEvent); + } + } + + public static void sendClusterResetEvent(String appId, String serviceName, String clusterId, + String instanceId) { + ClusterResetEvent clusterResetEvent = new ClusterResetEvent(appId, serviceName, + clusterId, instanceId); + + if (log.isInfoEnabled()) { + log.info("Publishing cluster reset event: " + clusterId); + } + publishEvent(clusterResetEvent); + } + + public static void sendClusterCreatedEvent(Cluster cluster) { + ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(cluster); + + if (log.isInfoEnabled()) { + log.info("Publishing cluster created event: " + cluster.getClusterId()); + } + publishEvent(clusterCreatedEvent); + } + + public static void sendApplicationClustersCreated(String appId, List<Cluster> clusters) { + + if (log.isInfoEnabled()) { + log.info("Publishing Application Clusters Created event for Application: " + appId); + } + + publishEvent(new ApplicationClustersCreatedEvent(clusters, appId)); + } + + public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) { + + if (log.isInfoEnabled()) { + log.info("Publishing Application Clusters removed event for Application: " + appId); + } + + publishEvent(new ApplicationClustersRemovedEvent(clusters, appId)); + } + + public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) { + ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent( + ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster()); + if (log.isInfoEnabled()) { + log.info(String + .format("Publishing cluster removed event: [service] %s [cluster] %s", + ctxt.getCartridgeType(), ctxt.getClusterId())); + } + publishEvent(clusterRemovedEvent); + + } + + public static void sendInstanceSpawnedEvent(String serviceName, + String clusterId, String networkPartitionId, String partitionId, + String memberId, String lbClusterId, String publicIp, + String privateIp, MemberContext context) { + + long initTime = context.getInitTime(); + InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent( + serviceName, clusterId, networkPartitionId, partitionId, + memberId, initTime, context.getInstanceId()); + instanceSpawnedEvent.setLbClusterId(lbClusterId); + instanceSpawnedEvent.setMemberIp(privateIp); + instanceSpawnedEvent.setMemberPublicIp(publicIp); + instanceSpawnedEvent.setProperties(CloudControllerUtil + .toJavaUtilProperties(context.getProperties())); + log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s " + + " [instance-id] %s [network-partition] %s [partition] %s " + + "[member]%s [lb-cluster-id] %s", + serviceName, clusterId, context.getInstanceId(), networkPartitionId, partitionId, + memberId, lbClusterId)); + publishEvent(instanceSpawnedEvent); + } + + public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) { + MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(), + instanceStartedEvent.getClusterId(), instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(), + instanceStartedEvent.getInstanceId()); + if (log.isInfoEnabled()) { + log.info(String + .format("Publishing member started event: [service] %s [cluster] %s [instance-id] %s " + + "[network-partition] %s [partition] %s [member] %s", + instanceStartedEvent.getServiceName(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getInstanceId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getMemberId())); + } + publishEvent(memberStartedEventTopology); + } + + public static void sendMemberActivatedEvent( + MemberActivatedEvent memberActivatedEvent) { + if (log.isInfoEnabled()) { + log.info(String + .format("Publishing member activated event: [service] %s [cluster] %s " + + "[instance-id] %s [network-partition] %s [partition] %s [member] %s", + memberActivatedEvent.getServiceName(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getInstanceId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getMemberId())); + } + publishEvent(memberActivatedEvent); + } + + public static void sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing member Ready to shut down event: [service] %s " + + " [instance-id] %s [cluster] %s [network-partition] %s [partition] %s " + + "[member] %s [groupId] %s", + memberReadyToShutdownEvent.getServiceName(), + memberReadyToShutdownEvent.getClusterId(), + memberReadyToShutdownEvent.getInstanceId(), + memberReadyToShutdownEvent.getNetworkPartitionId(), + memberReadyToShutdownEvent.getPartitionId(), + memberReadyToShutdownEvent.getMemberId(), + memberReadyToShutdownEvent.getGroupId())); + } + // grouping + memberReadyToShutdownEvent.setGroupId(memberReadyToShutdownEvent.getGroupId()); + publishEvent(memberReadyToShutdownEvent); + } + + public static void sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent memberMaintenanceModeEvent) { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing Maintenance mode event: [service] %s [cluster] %s " + + " [instance-id] %s [network-partition] %s [partition] %s [member] %s " + + "[groupId] %s", memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId(), + memberMaintenanceModeEvent.getInstanceId(), + memberMaintenanceModeEvent.getNetworkPartitionId(), + memberMaintenanceModeEvent.getPartitionId(), + memberMaintenanceModeEvent.getMemberId(), + memberMaintenanceModeEvent.getGroupId())); + } + + publishEvent(memberMaintenanceModeEvent); + } + + public static void sendClusterActivatedEvent(ClusterActivatedEvent clusterActivatedEvent) { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing cluster activated event: [service] %s [cluster] %s " + + " [instance-id] %s [appId] %s", + clusterActivatedEvent.getServiceName(), + clusterActivatedEvent.getClusterId(), + clusterActivatedEvent.getInstanceId(), + clusterActivatedEvent.getAppId())); + } + publishEvent(clusterActivatedEvent); + } + + public static void sendClusterInactivateEvent(ClusterInactivateEvent clusterInactiveEvent) { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing cluster in-active event: [service] %s [cluster] %s " + + "[instance-id] %s [appId] %s", + clusterInactiveEvent.getServiceName(), clusterInactiveEvent.getClusterId(), + clusterInactiveEvent.getInstanceId(), clusterInactiveEvent.getAppId())); + } + publishEvent(clusterInactiveEvent); + } + + public static void sendClusterInstanceCreatedEvent(ClusterInstanceCreatedEvent clusterInstanceCreatedEvent) { + if (log.isInfoEnabled()) { + log.info(String.format("Publishing cluster Instance Created event: [service] %s [cluster] %s " + + "[instance-id] %s", + clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId(), + clusterInstanceCreatedEvent.getInstanceId())); + } + publishEvent(clusterInstanceCreatedEvent); + } + + + public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String networkPartitionId, + String partitionId, String memberId, + Properties properties, String groupId, String instanceId) { + MemberTerminatedEvent memberTerminatedEvent = new MemberTerminatedEvent(serviceName, clusterId, + networkPartitionId, partitionId, memberId, instanceId); + memberTerminatedEvent.setProperties(properties); + memberTerminatedEvent.setGroupId(groupId); + + if (log.isInfoEnabled()) { + log.info(String.format("Publishing member terminated event: [service] %s [cluster] %s " + + " [instance-id] %s [network-partition] %s [partition] %s [member] %s " + + "[groupId] %s", serviceName, clusterId, instanceId, networkPartitionId, + partitionId, memberId, groupId)); + } + + publishEvent(memberTerminatedEvent); + } + + public static void sendCompleteTopologyEvent(Topology topology) { + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing complete topology event")); + } + publishEvent(completeTopologyEvent); + } + + public static void sendClusterTerminatingEvent(ClusterTerminatingEvent clusterTerminatingEvent) { + + if (log.isInfoEnabled()) { + log.info(String.format("Publishing Cluster terminating event: [appId] %s [cluster id] %s" + + " [instance-id] %s ", + clusterTerminatingEvent.getAppId(), clusterTerminatingEvent.getClusterId(), + clusterTerminatingEvent.getInstanceId())); + } + + publishEvent(clusterTerminatingEvent); + } + + public static void sendClusterTerminatedEvent(ClusterTerminatedEvent clusterTerminatedEvent) { + + if (log.isInfoEnabled()) { + log.info(String.format("Publishing Cluster terminated event: [appId] %s [cluster id] %s" + + " [instance-id] %s ", + clusterTerminatedEvent.getAppId(), clusterTerminatedEvent.getClusterId(), + clusterTerminatedEvent.getInstanceId())); + } + + publishEvent(clusterTerminatedEvent); + } + + public static void publishEvent(Event event) { + String topic = Util.getMessageTopicName(event); + EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); + eventPublisher.publish(event); + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java new file mode 100644 index 0000000..2c51557 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.topology; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; +import org.apache.stratos.messaging.domain.topology.Topology; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Persistence and retrieval of Topology from Registry + */ +public class TopologyManager { + private static final Log log = LogFactory.getLog(TopologyManager.class); + + private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + private static volatile Topology topology; + + private TopologyManager() { + } + + public static void acquireReadLock() { + if(log.isDebugEnabled()) { + log.debug("Read lock acquired"); + } + readLock.lock(); + } + + public static void releaseReadLock() { + if(log.isDebugEnabled()) { + log.debug("Read lock released"); + } + readLock.unlock(); + } + + public static void acquireWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock acquired"); + } + writeLock.lock(); + } + + public static void releaseWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock released"); + } + writeLock.unlock(); + } + + public static Topology getTopology() { + if (topology == null) { + synchronized (TopologyManager.class) { + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Trying to retrieve topology from registry"); + } + topology = CloudControllerUtil.retrieveTopology(); + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Topology not found in registry, creating new"); + } + topology = new Topology(); + } + if (log.isDebugEnabled()) { + log.debug("Topology initialized"); + } + } + } + } + return topology; + } + + /** + * Update in-memory topology and persist it in registry. + * @param topology_ + */ + public static void updateTopology(Topology topology_) { + synchronized (TopologyManager.class) { + if (log.isDebugEnabled()) { + log.debug("Updating topology"); + } + topology = topology_; + CloudControllerUtil.persistTopology(topology); + if (log.isDebugEnabled()) { + log.debug(String.format("Topology updated: %s", toJson(topology))); + } + } + + } + + private static String toJson(Object object) { + Gson gson = new Gson(); + return gson.toJson(object); + } +} + http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java new file mode 100644 index 0000000..5117823 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologySynchronizerTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.messaging.topology; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; +import org.wso2.carbon.ntask.core.Task; + +public class TopologySynchronizerTask implements Task{ + private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class); + + @Override + public void execute() { + if (log.isDebugEnabled()) { + log.debug("Executing topology synchronization task"); + } + + if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning() || + // this is a temporary fix to avoid task execution - limitation with ntask + (!FasterLookUpDataHolder.getInstance().getEnableTopologySync())){ + if(log.isWarnEnabled()) { + log.warn("Topology synchronization is disabled."); + } + return; + } + + // publish to the topic + if (TopologyManager.getTopology() != null) { + TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology()); + } + } + + @Override + public void init() { + + // this is a temporary fix to avoid task execution - limitation with ntask + if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ + if(log.isWarnEnabled()) { + log.warn("Topology synchronization is disabled."); + } + return; + } + } + + @Override + public void setProperties(Map<String, String> arg0) {} + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Deserializer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Deserializer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Deserializer.java deleted file mode 100644 index 0deb947..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Deserializer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.persist; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -public class Deserializer { - - private static final Log log = LogFactory.getLog(Deserializer.class); - - /** - * We deserialize only if the path to the serialized object file is exists. - * @param filePath path to the serialized object file - * @return the object obtained after deserialization or null if file isn't valid. - * @throws Exception - */ - public static Object deserialize(String filePath) throws Exception { - - ObjectInputStream objIn = null; - Object obj = null; - - if(!new File(filePath).isFile()){ - return obj; - } - - try { - - objIn = new ObjectInputStream(new FileInputStream(filePath)); - obj = objIn.readObject(); - - } catch (IOException e) { - log.error("Failed to deserialize the file at "+filePath , e); - throw e; - - } catch (ClassNotFoundException e) { - log.error("Failed to deserialize the file at "+filePath , e); - throw e; - - } finally { - if (objIn != null) { - objIn.close(); - } - } - - return obj; - - } - - /** - * Deserialize a byte array and retrieve the object. - * @param bytes bytes to be deserialized - * @return the deserialized {@link Object} - * @throws Exception if the deserialization is failed. - */ - public static Object deserializeFromByteArray(byte[] bytes) throws Exception { - - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - ObjectInput in = null; - try { - in = new ObjectInputStream(bis); - Object o = in.readObject(); - - return o; - - } finally { - bis.close(); - if (in != null) { - in.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Serializer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Serializer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Serializer.java deleted file mode 100644 index e2d1137..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/persist/Serializer.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.persist; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.messaging.domain.topology.Topology; - -import java.io.*; - -public class Serializer { - - private static final Log log = LogFactory.getLog(Serializer.class); - - public static void serializeToFile(Object serializableObj, String filePath) throws IOException { - - File outFile = new File(filePath); - ObjectOutput objOut = null; - FileOutputStream fileOutputStream = null; - - try { - - if(outFile.createNewFile()){ - log.debug("Serialization file is created at "+filePath); - } else{ - log.debug("Serialization file is already existing at "+filePath); - } - fileOutputStream = new FileOutputStream(outFile); - objOut = new ObjectOutputStream(fileOutputStream); - objOut.writeObject(serializableObj); - - } catch (IOException e) { - log.error("Failed to serialize the object "+serializableObj.toString() - + " to file "+filePath , e); - throw e; - - } finally{ - if(objOut != null) { - objOut.close(); - } - if(fileOutputStream != null) { - fileOutputStream.close(); - } - } - - } - - /** - * Serialize a {@link org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder} to a byte array. - * @param serializableObj - * @return byte[] - * @throws IOException - */ - public static byte[] serializeToByteArray(FasterLookUpDataHolder serializableObj) throws IOException { - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutput out = null; - try { - out = new ObjectOutputStream(bos); - out.writeObject(serializableObj); - - return bos.toByteArray(); - - } finally { - if (out != null) { - out.close(); - } - bos.close(); - } - - } - - /** - * Serialize a {@link org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder} to a byte array. - * @param topology - * @return byte[] - * @throws IOException - */ - public static byte[] serializeToByteArray(Topology topology) throws IOException { - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutput out = null; - try { - out = new ObjectOutputStream(bos); - out.writeObject(topology); - - return bos.toByteArray(); - - } finally { - if (out != null) { - out.close(); - } - bos.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/AppType.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/AppType.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/AppType.java deleted file mode 100644 index 135a695..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/AppType.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.pojo; - -import java.io.Serializable; - -/** - * domain mapping related data. - * - */ -public class AppType implements Serializable{ - - private static final long serialVersionUID = 3550489774139807168L; - private String name; - private boolean appSpecificMapping = true; - - public AppType(){ - - } - - public AppType(String name){ - this.setName(name); - } - - public AppType(String name, boolean appSpecificMapping){ - this.setName(name); - this.setAppSpecificMapping(appSpecificMapping); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public boolean isAppSpecificMapping() { - return appSpecificMapping; - } - - public void setAppSpecificMapping(boolean appSpecificMapping) { - this.appSpecificMapping = appSpecificMapping; - } - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/ApplicationClusterContextDTO.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/ApplicationClusterContextDTO.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/ApplicationClusterContextDTO.java deleted file mode 100644 index 9e3d046..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/pojo/ApplicationClusterContextDTO.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cloud.controller.pojo; - - -import org.apache.stratos.common.Properties; - -public class ApplicationClusterContextDTO { - - // cluster id - private String clusterId; - // cartridge type - private String cartridgeType; - // payload as a String - private String textPayload; - // host name - private String hostName; - // flag to indicate LB cluster - private boolean isLbCluster; - // autoscaling policy - private String autoscalePolicyName; - // deployment policy - private String deploymentPolicyName; - // tenant rance - private String tenantRange; - // propertis - private Properties properties; - - - public ApplicationClusterContextDTO () { - } - - - public String getClusterId() { - return clusterId; - } - - public void setClusterId(String clusterId) { - this.clusterId = clusterId; - } - - public String getCartridgeType() { - return cartridgeType; - } - - public void setCartridgeType(String cartridgeType) { - this.cartridgeType = cartridgeType; - } - - public String getTextPayload() { - return textPayload; - } - - public void setTextPayload(String textPayload) { - this.textPayload = textPayload; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public boolean isLbCluster() { - return isLbCluster; - } - - public void setLbCluster(boolean lbCluster) { - isLbCluster = lbCluster; - } - - public String getAutoscalePolicyName() { - return autoscalePolicyName; - } - - public void setAutoscalePolicyName(String autoscalePolicyName) { - this.autoscalePolicyName = autoscalePolicyName; - } - - public String getDeploymentPolicyName() { - return deploymentPolicyName; - } - - public void setDeploymentPolicyName(String deploymentPolicyName) { - this.deploymentPolicyName = deploymentPolicyName; - } - - public String getTenantRange() { - return tenantRange; - } - - public void setTenantRange(String tenantRange) { - this.tenantRange = tenantRange; - } - - public boolean equals(Object other) { - - if(other == null || !(other instanceof ApplicationClusterContextDTO)) { - return false; - } - - if (this == other) { - return true; - } - - ApplicationClusterContextDTO that = (ApplicationClusterContextDTO)other; - - return this.cartridgeType.equals(that.cartridgeType) && - this.clusterId.equals(that.clusterId); - } - - public int hashCode() { - return this.cartridgeType.hashCode() + this.clusterId.hashCode(); - } - - public Properties getProperties() { - return properties; - } - - public void setProperties(Properties properties) { - this.properties = properties; - } - -}
