http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java deleted file mode 100644 index d72c552..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ /dev/null @@ -1,962 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException; -import org.apache.stratos.cloud.controller.exception.InvalidMemberException; -import org.apache.stratos.cloud.controller.pojo.Cartridge; -import org.apache.stratos.cloud.controller.pojo.*; -import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisher; -import org.apache.stratos.cloud.controller.registry.RegistryManager; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.cloud.controller.util.CloudControllerUtil; -import org.apache.stratos.common.constants.StratosConstants; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.instance.ClusterInstance; -import org.apache.stratos.messaging.domain.topology.*; -import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; -import org.apache.stratos.messaging.event.cluster.status.*; -import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient; -import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient; -import org.wso2.carbon.registry.core.exceptions.RegistryException; - -import java.util.*; - -/** - * this is to manipulate the received events by cloud controller - * and build the complete topology with the events received - */ -public class TopologyBuilder { - private static final Log log = LogFactory.getLog(TopologyBuilder.class); - - - public static void handleServiceCreated(List<Cartridge> cartridgeList) { - Service service; - Topology topology = TopologyManager.getTopology(); - if (cartridgeList == null) { - log.warn(String.format("Cartridge list is empty")); - return; - } - - try { - - TopologyManager.acquireWriteLock(); - for (Cartridge cartridge : cartridgeList) { - if (!topology.serviceExists(cartridge.getType())) { - service = new Service(cartridge.getType(), cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant); - List<PortMapping> portMappings = cartridge.getPortMappings(); - Properties properties = new Properties(); - for (Map.Entry<String, String> entry : cartridge.getProperties().entrySet()) { - properties.setProperty(entry.getKey(), entry.getValue()); - } - service.setProperties(properties); - Port port; - //adding ports to the event - for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), - Integer.parseInt(portMapping.getPort()), - Integer.parseInt(portMapping.getProxyPort())); - service.addPort(port); - } - topology.addService(service); - TopologyManager.updateTopology(topology); - } - } - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendServiceCreateEvent(cartridgeList); - - } - - public static void handleServiceRemoved(List<Cartridge> cartridgeList) { - Topology topology = TopologyManager.getTopology(); - - for (Cartridge cartridge : cartridgeList) { - if (topology.getService(cartridge.getType()).getClusters().size() == 0) { - if (topology.serviceExists(cartridge.getType())) { - try { - TopologyManager.acquireWriteLock(); - topology.removeService(cartridge.getType()); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); - } else { - log.warn(String.format("Service %s does not exist..", cartridge.getType())); - } - } else { - log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() - + " from the topology"); - } - } - } - - public static void handleClusterCreated(ClusterStatusClusterCreatedEvent event) { - TopologyManager.acquireWriteLock(); - Cluster cluster; - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - if (service.clusterExists(event.getClusterId())) { - log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); - return; - } else { - cluster = new Cluster(event.getServiceName(), - event.getClusterId(), event.getDeploymentPolicyName(), - event.getAutosScalePolicyName(), event.getAppId()); - //cluster.setStatus(Status.Created); - cluster.setHostNames(event.getHostNames()); - cluster.setTenantRange(event.getTenantRange()); - service.addCluster(cluster); - TopologyManager.updateTopology(topology); - } - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendClusterCreatedEvent(cluster); - } - - public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - - for (Cluster cluster : appClusters) { - Service service = topology.getService(cluster.getServiceName()); - if (service == null) { - log.error("Service " + cluster.getServiceName() - + " not found in Topology, unable to create Application cluster"); - } else { - service.addCluster(cluster); - log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology"); - } - } - - TopologyManager.updateTopology(topology); - - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendApplicationClustersCreated(appId, appClusters); - - } - - public static void handleApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusterData) { - TopologyManager.acquireWriteLock(); - - List<Cluster> removedClusters = new ArrayList<Cluster>(); - FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); - try { - Topology topology = TopologyManager.getTopology(); - - if (clusterData != null) { - // remove clusters from CC topology model and remove runtime information - for (ClusterDataHolder aClusterData : clusterData) { - Service aService = topology.getService(aClusterData.getServiceType()); - if (aService != null) { - removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); - } else { - log.warn("Service " + aClusterData.getServiceType() + " not found, unable to remove Cluster " + aClusterData.getClusterId()); - } - // remove runtime data - dataHolder.removeClusterContext(aClusterData.getClusterId()); - - log.info("Removed application [ " + appId + " ]'s Cluster [ " + aClusterData.getClusterId() + " ] from the topology"); - } - // persist runtime data changes - persist(dataHolder); - } else { - log.info("No cluster data found for application " + appId + " to remove"); - } - - TopologyManager.updateTopology(topology); - - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData); - - } - - /** - * Persist data in registry. - */ - private static void persist(FasterLookUpDataHolder dataHolder) { - try { - RegistryManager.getInstance().persist( - dataHolder); - } catch (RegistryException e) { - - String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; - log.fatal(msg); - throw new CloudControllerException(msg, e); - } - } - - public static void handleClusterReset(ClusterStatusClusterResetEvent event) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(event.getServiceName()); - if (service == null) { - log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - Cluster cluster = service.getCluster(event.getClusterId()); - if (cluster == null) { - log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + - "status to Created"); - return; - } - - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Created; - if(context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Created adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - } - - } finally { - TopologyManager.releaseWriteLock(); - } - - - } - - public static void handleClusterInstanceCreated(String serviceType, String clusterId, String alias, String instanceId) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceType); - if (service == null) { - log.error("Service " + serviceType + - " not found in Topology, unable to update the cluster status to Created"); - return; - } - - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.error("Cluster " + clusterId + " not found in Topology, unable to update " + - "status to Created"); - return; - } - - if(cluster.getInstanceContexts(instanceId) != null) { - log.warn("The Instance context for the cluster already exists for [cluster] " + - clusterId + " [instance-id] " + instanceId); - return; - } - - //context.setStatus(ClusterStatus.Created); - cluster.addInstanceContext(instanceId, new ClusterInstance(alias, clusterId, instanceId)); - TopologyManager.updateTopology(topology); - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - new ClusterInstanceCreatedEvent(alias, serviceType, clusterId, instanceId); - TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); - - } finally { - TopologyManager.releaseWriteLock(); - } - } - - - - public static void handleClusterCreated(Registrant registrant, boolean isLb) { - /*Topology topology = TopologyManager.getTopology(); - Service service; - try { - TopologyManager.acquireWriteLock(); - String cartridgeType = registrant.getCartridgeType(); - service = topology.getService(cartridgeType); - Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); - - Cluster cluster; - String clusterId = registrant.getClusterId(); - if (service.clusterExists(clusterId)) { - // update the cluster - cluster = service.getCluster(clusterId); - cluster.addHostName(registrant.getHostName()); - if (service.getServiceType() == ServiceType.MultiTenant) { - cluster.setTenantRange(registrant.getTenantRange()); - } - if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { - props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); - } - cluster.setProperties(props); - cluster.setLbCluster(isLb); - } else { - cluster = new Cluster(cartridgeType, clusterId, - registrant.getDeploymentPolicyName(), registrant.getAutoScalerPolicyName(), null); - cluster.addHostName(registrant.getHostName()); - if (service.getServiceType() == ServiceType.MultiTenant) { - cluster.setTenantRange(registrant.getTenantRange()); - } - if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { - props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); - } - cluster.setProperties(props); - cluster.setLbCluster(isLb); - //cluster.setStatus(Status.Created); - service.addCluster(cluster); - } - TopologyManager.updateTopology(topology); - TopologyEventPublisher.sendClusterCreatedEvent(cartridgeType, clusterId, cluster); - - } finally { - TopologyManager.releaseWriteLock(); - }*/ - } - - - private static void setKubernetesCluster(Cluster cluster) { - boolean isKubernetesCluster = (cluster.getProperties().getProperty(StratosConstants.KUBERNETES_CLUSTER_ID) != null); - if (log.isDebugEnabled()) { - log.debug(" Kubernetes Cluster ["+ isKubernetesCluster + "] "); - } - cluster.setKubernetesCluster(isKubernetesCluster); - } - - public static void handleClusterRemoved(ClusterContext ctxt) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(ctxt.getCartridgeType()); - String deploymentPolicy; - if (service == null) { - log.warn(String.format("Service %s does not exist", - ctxt.getCartridgeType())); - return; - } - - if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", - ctxt.getClusterId(), - ctxt.getCartridgeType())); - return; - } - - try { - TopologyManager.acquireWriteLock(); - Cluster cluster = service.removeCluster(ctxt.getClusterId()); - deploymentPolicy = cluster.getDeploymentPolicyName(); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); - } - - public static void handleMemberSpawned(String serviceName, - String clusterId, String partitionId, - String privateIp, String publicIp, MemberContext context) { - // adding the new member to the cluster after it is successfully started - // in IaaS. - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Cluster cluster = service.getCluster(clusterId); - String memberId = context.getMemberId(); - String networkPartitionId = context.getNetworkPartitionId(); - String lbClusterId = context.getLbClusterId(); - long initTime = context.getInitTime(); - - if (cluster.memberExists(memberId)) { - log.warn(String.format("Member %s already exists", memberId)); - return; - } - - try { - TopologyManager.acquireWriteLock(); - Member member = new Member(serviceName, clusterId, - networkPartitionId, partitionId, memberId, initTime); - member.setStatus(MemberStatus.Created); - member.setInstanceId(context.getInstanceId()); - member.setMemberIp(privateIp); - member.setLbClusterId(lbClusterId); - member.setMemberPublicIp(publicIp); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties())); - try { - - Cartridge cartridge = FasterLookUpDataHolder.getInstance().getCartridge(serviceName); - List<PortMapping> portMappings = cartridge.getPortMappings(); - Port port; - if(cluster.isKubernetesCluster()){ - // Update port mappings with generated service proxy port - // TODO: Need to properly fix with the latest Kubernetes version - String serviceHostPortStr = CloudControllerUtil.getProperty(context.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT); - if(StringUtils.isEmpty(serviceHostPortStr)) { - log.warn("Kubernetes service host port not found for member: [member-id] " + memberId); - } - // Adding ports to the member - if (StringUtils.isNotEmpty(serviceHostPortStr)) { - for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), - Integer.parseInt(serviceHostPortStr), - Integer.parseInt(portMapping.getProxyPort())); - member.addPort(port); - } - } - - } else { - - // Adding ports to the member - for (PortMapping portMapping : portMappings) { - - port = new Port(portMapping.getProtocol(), - Integer.parseInt(portMapping.getPort()), - Integer.parseInt(portMapping.getProxyPort())); - member.addPort(port); - - } - } - - } catch (Exception e) { - log.error("Could not update member port-map: [member-id] " + memberId, e); - } - cluster.addMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId, - networkPartitionId, partitionId, memberId, lbClusterId, - publicIp, privateIp, context); - } - - public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceStartedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceStartedEvent.getServiceName())); - return; - } - if (!service.clusterExists(instanceStartedEvent.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName())); - return; - } - - Member member = service.getCluster(instanceStartedEvent.getClusterId()). - getMember(instanceStartedEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceStartedEvent.getMemberId())); - return; - } - - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Starting)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Starting); - } - member.setStatus(MemberStatus.Starting); - log.info("member started event adding status started"); - - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - //memberStartedEvent. - TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); - //publishing data - CartridgeInstanceDataPublisher.publish(instanceStartedEvent.getMemberId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), - null); - } - - public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceActivatedEvent.getServiceName()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceActivatedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceActivatedEvent.getClusterId())); - return; - } - - Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); - - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceActivatedEvent.getMemberId())); - return; - } - - MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( - instanceActivatedEvent.getServiceName(), - instanceActivatedEvent.getClusterId(), - instanceActivatedEvent.getNetworkPartitionId(), - instanceActivatedEvent.getPartitionId(), - instanceActivatedEvent.getMemberId(), - instanceActivatedEvent.getInstanceId()); - - // grouping - set grouid - //TODO - memberActivatedEvent.setApplicationId(null); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.Activated)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.Activated); - } - member.setStatus(MemberStatus.Activated); - log.info("member started event adding status activated"); - Cartridge cartridge = FasterLookUpDataHolder.getInstance(). - getCartridge(instanceActivatedEvent.getServiceName()); - - List<PortMapping> portMappings = cartridge.getPortMappings(); - Port port; - //adding ports to the event - for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), - Integer.parseInt(portMapping.getPort()), - Integer.parseInt(portMapping.getProxyPort())); - member.addPort(port); - memberActivatedEvent.addPort(port); - } - - memberActivatedEvent.setMemberIp(member.getMemberIp()); - memberActivatedEvent.setMemberPublicIp(member.getMemberPublicIp()); - TopologyManager.updateTopology(topology); - - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); - //publishing data - CartridgeInstanceDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Activated.toString(), - null); - } - - public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceReadyToShutdownEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceReadyToShutdownEvent.getClusterId())); - return; - } - - - Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceReadyToShutdownEvent.getMemberId())); - return; - } - MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( - instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - - if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.ReadyToShutDown); - } - member.setStatus(MemberStatus.ReadyToShutDown); - log.info("Member Ready to shut down event adding status started"); - - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); - //publishing data - CartridgeInstanceDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); - //termination of particular instance will be handled by autoscaler - } - - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); - //update the status of the member - if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceMaintenanceModeEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - instanceMaintenanceModeEvent.getClusterId())); - return; - } - - Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); - if (member == null) { - log.warn(String.format("Member %s does not exist", - instanceMaintenanceModeEvent.getMemberId())); - return; - } - - - MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( - instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId(), - instanceMaintenanceModeEvent.getMemberId(), - instanceMaintenanceModeEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - // try update lifecycle state - if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " + MemberStatus.In_Maintenance); - } - member.setStatus(MemberStatus.In_Maintenance); - log.info("member maintenance mode event adding status started"); - - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - //publishing data - TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - - } - - public static void handleMemberTerminated(String serviceName, String clusterId, - String networkPartitionId, String partitionId, - String memberId) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Properties properties; - if (service == null) { - log.warn(String.format("Service %s does not exist", - serviceName)); - return; - } - Cluster cluster = service.getCluster(clusterId); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterId)); - return; - } - - Member member = cluster.getMember(memberId); - String instanceId = member.getInstanceId(); - - if (member == null) { - log.warn(String.format("Member with member id %s does not exist", - memberId)); - return; - } - - try { - TopologyManager.acquireWriteLock(); - properties = member.getProperties(); - cluster.removeMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - /* @TODO leftover from grouping_poc*/ - String groupAlias = null; - TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, networkPartitionId, - partitionId, memberId, properties, groupAlias, instanceId); - } - - public static void handleMemberSuspended() { - //TODO - try { - TopologyManager.acquireWriteLock(); - } finally { - TopologyManager.releaseWriteLock(); - } - } - - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterActivatedEvent) { - - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterActivatedEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterActivatedEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(clusterActivatedEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterActivatedEvent.getClusterId())); - return; - } - - org.apache.stratos.messaging.event.topology.ClusterActivatedEvent clusterActivatedEvent1 = - new org.apache.stratos.messaging.event.topology.ClusterActivatedEvent( - clusterActivatedEvent.getAppId(), - clusterActivatedEvent.getServiceName(), - clusterActivatedEvent.getClusterId(), - clusterActivatedEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterActivatedEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - clusterActivatedEvent.getClusterId() + " [instance-id] " + - clusterActivatedEvent.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Active; - if(context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster activated adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterActivatedEvent(clusterActivatedEvent1); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterActivatedEvent.getClusterId(), clusterActivatedEvent.getInstanceId(), - context.getStatus(), status)); - } - } finally { - TopologyManager.releaseWriteLock(); - } - - } - - public static void handleClusterInActivateEvent( - ClusterStatusClusterInactivateEvent clusterInActivateEvent) { - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(clusterInActivateEvent.getServiceName()); - //update the status of the cluster - if (service == null) { - log.warn(String.format("Service %s does not exist", - clusterInActivateEvent.getServiceName())); - return; - } - - Cluster cluster = service.getCluster(clusterInActivateEvent.getClusterId()); - if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterInActivateEvent.getClusterId())); - return; - } - - ClusterInactivateEvent clusterInActivatedEvent1 = - new ClusterInactivateEvent( - clusterInActivateEvent.getAppId(), - clusterInActivateEvent.getServiceName(), - clusterInActivateEvent.getClusterId(), - clusterInActivateEvent.getInstanceId()); - try { - TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterInActivateEvent.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - clusterInActivateEvent.getClusterId() + " [instance-id] " + - clusterInActivateEvent.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Inactive; - if(context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster InActive adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - TopologyEventPublisher.sendClusterInactivateEvent(clusterInActivatedEvent1); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterInActivateEvent.getClusterId(), clusterInActivateEvent.getInstanceId(), - context.getStatus(), status)); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } - - - private static void deleteAppResourcesFromMetadataService(ApplicationTerminatedEvent event) { - try { - MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); - metadataClient.deleteApplicationProperties(event.getAppId()); - } catch (Exception e) { - log.error("Error occurred while deleting the application resources frm metadata service ", e); - } - } - - public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Cluster cluster = topology.getService(event.getServiceName()). - getCluster(event.getClusterId()); - - if (!cluster.isStateTransitionValid(ClusterStatus.Terminated, null)) { - log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " + - ClusterStatus.Terminated); - } - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Terminated; - if(context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminated adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterTerminatedEvent clusterTerminatedEvent = new ClusterTerminatedEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); - - TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - } - } finally { - TopologyManager.releaseWriteLock(); - } - - - } - - public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { - - TopologyManager.acquireWriteLock(); - - try { - Topology topology = TopologyManager.getTopology(); - Cluster cluster = topology.getService(event.getServiceName()). - getCluster(event.getClusterId()); - - if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, null)) { - log.error("Invalid state transfer from " + cluster.getStatus(null) + " to " + - ClusterStatus.Terminating); - } - ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); - if (context == null) { - log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); - return; - } - ClusterStatus status = ClusterStatus.Terminating; - if(context.isStateTransitionValid(status)) { - context.setStatus(status); - log.info("Cluster Terminating adding status started for" + cluster.getClusterId()); - TopologyManager.updateTopology(topology); - //publishing data - ClusterTerminatingEvent clusterTerminaingEvent = new ClusterTerminatingEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), null); - - TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); - } else { - log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); - } - } finally { - TopologyManager.releaseWriteLock(); - } - } -}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java deleted file mode 100644 index 6d85c77..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ /dev/null @@ -1,321 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.stratos.cloud.controller.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.pojo.Cartridge; -import org.apache.stratos.cloud.controller.pojo.ClusterContext; -import org.apache.stratos.cloud.controller.pojo.MemberContext; -import org.apache.stratos.cloud.controller.pojo.PortMapping; -import org.apache.stratos.cloud.controller.util.CloudControllerUtil; -import org.apache.stratos.messaging.broker.publish.EventPublisher; -import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Port; -import org.apache.stratos.messaging.domain.topology.ServiceType; -import org.apache.stratos.messaging.domain.topology.Topology; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.applications.*; -import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.messaging.util.Util; - -import java.util.List; -import java.util.Properties; -import java.util.Set; - -/** - * this is to send the relevant events from cloud controller to topology topic - */ -public class TopologyEventPublisher { - private static final Log log = LogFactory.getLog(TopologyEventPublisher.class); - - public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) { - ServiceCreatedEvent serviceCreatedEvent; - for (Cartridge cartridge : cartridgeList) { - serviceCreatedEvent = new ServiceCreatedEvent(cartridge.getType(), - (cartridge.isMultiTenant() ? ServiceType.MultiTenant - : ServiceType.SingleTenant)); - - // Add ports to the event - Port port; - List<PortMapping> portMappings = cartridge.getPortMappings(); - for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), - Integer.parseInt(portMapping.getPort()), - Integer.parseInt(portMapping.getProxyPort())); - serviceCreatedEvent.addPort(port); - } - - if (log.isInfoEnabled()) { - log.info(String.format( - "Publishing service created event: [service] %s", - cartridge.getType())); - } - publishEvent(serviceCreatedEvent); - } - } - - public static void sendServiceRemovedEvent(List<Cartridge> cartridgeList) { - ServiceRemovedEvent serviceRemovedEvent; - for (Cartridge cartridge : cartridgeList) { - serviceRemovedEvent = new ServiceRemovedEvent(cartridge.getType()); - if (log.isInfoEnabled()) { - log.info(String.format( - "Publishing service removed event: [service] %s", - serviceRemovedEvent.getServiceName())); - } - publishEvent(serviceRemovedEvent); - } - } - - public static void sendClusterResetEvent(String appId, String serviceName, String clusterId, - String instanceId) { - ClusterResetEvent clusterResetEvent = new ClusterResetEvent(appId, serviceName, - clusterId, instanceId); - - if (log.isInfoEnabled()) { - log.info("Publishing cluster reset event: " + clusterId); - } - publishEvent(clusterResetEvent); - } - - public static void sendClusterCreatedEvent(Cluster cluster) { - ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(cluster); - - if (log.isInfoEnabled()) { - log.info("Publishing cluster created event: " + cluster.getClusterId()); - } - publishEvent(clusterCreatedEvent); - } - - public static void sendApplicationClustersCreated(String appId, List<Cluster> clusters) { - - if (log.isInfoEnabled()) { - log.info("Publishing Application Clusters Created event for Application: " + appId); - } - - publishEvent(new ApplicationClustersCreatedEvent(clusters, appId)); - } - - public static void sendApplicationClustersRemoved(String appId, Set<ClusterDataHolder> clusters) { - - if (log.isInfoEnabled()) { - log.info("Publishing Application Clusters removed event for Application: " + appId); - } - - publishEvent(new ApplicationClustersRemovedEvent(clusters, appId)); - } - - public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) { - ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent( - ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster()); - if (log.isInfoEnabled()) { - log.info(String - .format("Publishing cluster removed event: [service] %s [cluster] %s", - ctxt.getCartridgeType(), ctxt.getClusterId())); - } - publishEvent(clusterRemovedEvent); - - } - - public static void sendInstanceSpawnedEvent(String serviceName, - String clusterId, String networkPartitionId, String partitionId, - String memberId, String lbClusterId, String publicIp, - String privateIp, MemberContext context) { - - long initTime = context.getInitTime(); - InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent( - serviceName, clusterId, networkPartitionId, partitionId, - memberId, initTime, context.getInstanceId()); - instanceSpawnedEvent.setLbClusterId(lbClusterId); - instanceSpawnedEvent.setMemberIp(privateIp); - instanceSpawnedEvent.setMemberPublicIp(publicIp); - instanceSpawnedEvent.setProperties(CloudControllerUtil - .toJavaUtilProperties(context.getProperties())); - log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s " + - " [instance-id] %s [network-partition] %s [partition] %s " + - "[member]%s [lb-cluster-id] %s", - serviceName, clusterId, context.getInstanceId(), networkPartitionId, partitionId, - memberId, lbClusterId)); - publishEvent(instanceSpawnedEvent); - } - - public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) { - MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(), - instanceStartedEvent.getClusterId(), instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(), - instanceStartedEvent.getInstanceId()); - if (log.isInfoEnabled()) { - log.info(String - .format("Publishing member started event: [service] %s [cluster] %s [instance-id] %s " + - "[network-partition] %s [partition] %s [member] %s", - instanceStartedEvent.getServiceName(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getInstanceId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getMemberId())); - } - publishEvent(memberStartedEventTopology); - } - - public static void sendMemberActivatedEvent( - MemberActivatedEvent memberActivatedEvent) { - if (log.isInfoEnabled()) { - log.info(String - .format("Publishing member activated event: [service] %s [cluster] %s " + - "[instance-id] %s [network-partition] %s [partition] %s [member] %s", - memberActivatedEvent.getServiceName(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getInstanceId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getMemberId())); - } - publishEvent(memberActivatedEvent); - } - - public static void sendMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) { - if (log.isInfoEnabled()) { - log.info(String.format("Publishing member Ready to shut down event: [service] %s " + - " [instance-id] %s [cluster] %s [network-partition] %s [partition] %s " + - "[member] %s [groupId] %s", - memberReadyToShutdownEvent.getServiceName(), - memberReadyToShutdownEvent.getClusterId(), - memberReadyToShutdownEvent.getInstanceId(), - memberReadyToShutdownEvent.getNetworkPartitionId(), - memberReadyToShutdownEvent.getPartitionId(), - memberReadyToShutdownEvent.getMemberId(), - memberReadyToShutdownEvent.getGroupId())); - } - // grouping - memberReadyToShutdownEvent.setGroupId(memberReadyToShutdownEvent.getGroupId()); - publishEvent(memberReadyToShutdownEvent); - } - - public static void sendMemberMaintenanceModeEvent(MemberMaintenanceModeEvent memberMaintenanceModeEvent) { - if (log.isInfoEnabled()) { - log.info(String.format("Publishing Maintenance mode event: [service] %s [cluster] %s " + - " [instance-id] %s [network-partition] %s [partition] %s [member] %s " + - "[groupId] %s", memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId(), - memberMaintenanceModeEvent.getInstanceId(), - memberMaintenanceModeEvent.getNetworkPartitionId(), - memberMaintenanceModeEvent.getPartitionId(), - memberMaintenanceModeEvent.getMemberId(), - memberMaintenanceModeEvent.getGroupId())); - } - - publishEvent(memberMaintenanceModeEvent); - } - - public static void sendClusterActivatedEvent(ClusterActivatedEvent clusterActivatedEvent) { - if (log.isInfoEnabled()) { - log.info(String.format("Publishing cluster activated event: [service] %s [cluster] %s " + - " [instance-id] %s [appId] %s", - clusterActivatedEvent.getServiceName(), - clusterActivatedEvent.getClusterId(), - clusterActivatedEvent.getInstanceId(), - clusterActivatedEvent.getAppId())); - } - publishEvent(clusterActivatedEvent); - } - - public static void sendClusterInactivateEvent(ClusterInactivateEvent clusterInactiveEvent) { - if (log.isInfoEnabled()) { - log.info(String.format("Publishing cluster in-active event: [service] %s [cluster] %s " + - "[instance-id] %s [appId] %s", - clusterInactiveEvent.getServiceName(), clusterInactiveEvent.getClusterId(), - clusterInactiveEvent.getInstanceId(), clusterInactiveEvent.getAppId())); - } - publishEvent(clusterInactiveEvent); - } - - public static void sendClusterInstanceCreatedEvent(ClusterInstanceCreatedEvent clusterInstanceCreatedEvent) { - if (log.isInfoEnabled()) { - log.info(String.format("Publishing cluster Instance Created event: [service] %s [cluster] %s " + - "[instance-id] %s", - clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId(), - clusterInstanceCreatedEvent.getInstanceId())); - } - publishEvent(clusterInstanceCreatedEvent); - } - - - public static void sendMemberTerminatedEvent(String serviceName, String clusterId, String networkPartitionId, - String partitionId, String memberId, - Properties properties, String groupId, String instanceId) { - MemberTerminatedEvent memberTerminatedEvent = new MemberTerminatedEvent(serviceName, clusterId, - networkPartitionId, partitionId, memberId, instanceId); - memberTerminatedEvent.setProperties(properties); - memberTerminatedEvent.setGroupId(groupId); - - if (log.isInfoEnabled()) { - log.info(String.format("Publishing member terminated event: [service] %s [cluster] %s " + - " [instance-id] %s [network-partition] %s [partition] %s [member] %s " + - "[groupId] %s", serviceName, clusterId, instanceId, networkPartitionId, - partitionId, memberId, groupId)); - } - - publishEvent(memberTerminatedEvent); - } - - public static void sendCompleteTopologyEvent(Topology topology) { - CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); - - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing complete topology event")); - } - publishEvent(completeTopologyEvent); - } - - public static void sendClusterTerminatingEvent(ClusterTerminatingEvent clusterTerminatingEvent) { - - if (log.isInfoEnabled()) { - log.info(String.format("Publishing Cluster terminating event: [appId] %s [cluster id] %s" + - " [instance-id] %s ", - clusterTerminatingEvent.getAppId(), clusterTerminatingEvent.getClusterId(), - clusterTerminatingEvent.getInstanceId())); - } - - publishEvent(clusterTerminatingEvent); - } - - public static void sendClusterTerminatedEvent(ClusterTerminatedEvent clusterTerminatedEvent) { - - if (log.isInfoEnabled()) { - log.info(String.format("Publishing Cluster terminated event: [appId] %s [cluster id] %s" + - " [instance-id] %s ", - clusterTerminatedEvent.getAppId(), clusterTerminatedEvent.getClusterId(), - clusterTerminatedEvent.getInstanceId())); - } - - publishEvent(clusterTerminatedEvent); - } - - public static void publishEvent(Event event) { - String topic = Util.getMessageTopicName(event); - EventPublisher eventPublisher = EventPublisherPool.getPublisher(topic); - eventPublisher.publish(event); - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java deleted file mode 100644 index 9862b9a..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import com.google.gson.Gson; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.util.CloudControllerUtil; -import org.apache.stratos.messaging.domain.topology.Topology; - -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Persistence and retrieval of Topology from Registry - */ -public class TopologyManager { - private static final Log log = LogFactory.getLog(TopologyManager.class); - - private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - private static volatile Topology topology; - - private TopologyManager() { - } - - public static void acquireReadLock() { - if(log.isDebugEnabled()) { - log.debug("Read lock acquired"); - } - readLock.lock(); - } - - public static void releaseReadLock() { - if(log.isDebugEnabled()) { - log.debug("Read lock released"); - } - readLock.unlock(); - } - - public static void acquireWriteLock() { - if(log.isDebugEnabled()) { - log.debug("Write lock acquired"); - } - writeLock.lock(); - } - - public static void releaseWriteLock() { - if(log.isDebugEnabled()) { - log.debug("Write lock released"); - } - writeLock.unlock(); - } - - public static Topology getTopology() { - if (topology == null) { - synchronized (TopologyManager.class) { - if (topology == null) { - if (log.isDebugEnabled()) { - log.debug("Trying to retrieve topology from registry"); - } - topology = CloudControllerUtil.retrieveTopology(); - if (topology == null) { - if (log.isDebugEnabled()) { - log.debug("Topology not found in registry, creating new"); - } - topology = new Topology(); - } - if (log.isDebugEnabled()) { - log.debug("Topology initialized"); - } - } - } - } - return topology; - } - - /** - * Update in-memory topology and persist it in registry. - * @param topology_ - */ - public static void updateTopology(Topology topology_) { - synchronized (TopologyManager.class) { - if (log.isDebugEnabled()) { - log.debug("Updating topology"); - } - topology = topology_; - CloudControllerUtil.persistTopology(topology); - if (log.isDebugEnabled()) { - log.debug(String.format("Topology updated: %s", toJson(topology))); - } - } - - } - - private static String toJson(Object object) { - Gson gson = new Gson(); - return gson.toJson(object); - } -} - http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java deleted file mode 100644 index be3051e..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.wso2.carbon.ntask.core.Task; - -public class TopologySynchronizerTask implements Task{ - private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class); - - @Override - public void execute() { - if (log.isDebugEnabled()) { - log.debug("Executing topology synchronization task"); - } - - if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning() || - // this is a temporary fix to avoid task execution - limitation with ntask - (!FasterLookUpDataHolder.getInstance().getEnableTopologySync())){ - if(log.isWarnEnabled()) { - log.warn("Topology synchronization is disabled."); - } - return; - } - - // publish to the topic - if (TopologyManager.getTopology() != null) { - TopologyEventPublisher.sendCompleteTopologyEvent(TopologyManager.getTopology()); - } - } - - @Override - public void init() { - - // this is a temporary fix to avoid task execution - limitation with ntask - if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ - if(log.isWarnEnabled()) { - log.warn("Topology synchronization is disabled."); - } - return; - } - } - - @Override - public void setProperties(Map<String, String> arg0) {} - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java new file mode 100644 index 0000000..913289d --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/AxiomXpathParserUtil.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.util; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMFactory; +import org.apache.axiom.om.OMNode; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.axiom.om.impl.dom.DOOMAbstractFactory; +import org.apache.axiom.om.impl.dom.ElementImpl; +import org.apache.axiom.om.xpath.AXIOMXPath; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.exception.MalformedConfigurationFileException; +import org.jaxen.JaxenException; +import org.w3c.dom.Element; +import org.wso2.securevault.SecretResolver; +import org.wso2.securevault.SecretResolverFactory; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.transform.Source; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import javax.xml.validation.Validator; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.*; + +/** + * This class is parsing configuration files using Axiom Xpath. + */ +public class AxiomXpathParserUtil { + + private static final Log LOG = LogFactory.getLog(AxiomXpathParserUtil.class); + + private AxiomXpathParserUtil(){} + + public static OMElement parse(File xmlSource) throws MalformedConfigurationFileException, + IllegalArgumentException { + + OMElement documentElement; + + if (xmlSource == null) { + String msg = "File is null."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + try { + documentElement = new StAXOMBuilder(xmlSource.getPath()).getDocumentElement(); + return documentElement; + + } catch (XMLStreamException e) { + String msg = "Failed to parse the configuration file : " + xmlSource.getPath(); + LOG.error(msg, e); + throw new MalformedConfigurationFileException(msg, e); + } catch (FileNotFoundException e) { + String msg = "Configuration file cannot be found : " + xmlSource.getPath(); + LOG.error(msg); + throw new MalformedConfigurationFileException(msg); + } + + } + + private static Element getDOMElement(final OMElement omElement) { + + // Get the StAX reader from the created element + XMLStreamReader llomReader = omElement.getXMLStreamReader(); + + // Create the DOOM OMFactory + OMFactory doomFactory = DOOMAbstractFactory.getOMFactory(); + + // Create the new builder + StAXOMBuilder doomBuilder = new StAXOMBuilder(doomFactory, llomReader); + + // Get the document element + OMElement newElem = doomBuilder.getDocumentElement(); + + return newElem instanceof Element ? (Element) newElem : null; + } + + private static OMElement getElement(final Object obj) { + OMNode node; + if ((obj instanceof OMNode) && (node = (OMNode) obj).getType() == OMNode.ELEMENT_NODE) { + + OMElement element = (OMElement) node; + + return element; + + } + + return null; + } + + public static OMElement getElement(final String fileName, final OMElement rootElt, + final String eltStr, final String xpath) { + List<?> nodes = getMatchingNodes(xpath, rootElt); + neglectingWarn(fileName, eltStr, nodes.size()); + OMElement element = getElement(nodes.get(0)); + return element; + } + + public static OMElement getFirstChildElement(final OMElement root, final String childName) { + Iterator<?> it = root.getChildrenWithName(new QName(childName)); + if (it.hasNext()) { + return (OMElement) it.next(); + } + + return null; + } + + private static void neglectingWarn(final String fileName, final String elt, final int size) { + if (size > 1) { + LOG.warn(fileName + " contains more than one " + elt + " elements!" + + " Elements other than the first will be neglected."); + } + } + + public static void plainTextWarn(final String elt) { + LOG.warn("Unable to find a value for " + elt + " element from Secure Vault." + + "Hence we will try to assign the plain text value (if specified)."); + } + + /** + * @param xpath + * XPATH expression to be read. + * @param elt + * OMElement to be used for the search. + * @return List matching OMNode list + */ + @SuppressWarnings("unchecked") + public static OMNode getFirstMatchingNode(final String xpath, final OMElement elt) throws MalformedConfigurationFileException{ + + AXIOMXPath axiomXpath; + List<OMNode> nodeList = null; + try { + axiomXpath = new AXIOMXPath(xpath); + nodeList = axiomXpath.selectNodes(elt); + return nodeList.isEmpty() ? null : nodeList.get(0); + } catch (JaxenException e) { + String msg = "Error occurred while reading the Xpath (" + xpath + ")"; + LOG.error(msg, e); + throw new MalformedConfigurationFileException(msg, e); + } + + } + + /** + * @param xpath + * XPATH expression to be read. + * @return List matching list + */ + @SuppressWarnings("unchecked") + public static List<OMNode> getMatchingNodes(OMElement elt, final String xpath) throws MalformedConfigurationFileException{ + + AXIOMXPath axiomXpath; + List<OMNode> nodeList = null; + try { + axiomXpath = new AXIOMXPath(xpath); + nodeList = axiomXpath.selectNodes(elt); + return nodeList; + } catch (JaxenException e) { + String msg = "Error occurred while reading the Xpath (" + xpath + ")"; + LOG.error(msg, e); + throw new MalformedConfigurationFileException(msg, e); + } + + } + + /** + * @param xpath + * XPATH expression to be read. + * @param elt + * OMElement to be used for the search. + * @return List matching OMNode list + */ + @SuppressWarnings("unchecked") + public static List<OMNode> getMatchingNodes(final String xpath, final OMElement elt) throws MalformedConfigurationFileException{ + + AXIOMXPath axiomXpath; + List<OMNode> nodeList = null; + try { + axiomXpath = new AXIOMXPath(xpath); + nodeList = axiomXpath.selectNodes(elt); + return nodeList; + } catch (JaxenException e) { + String msg = "Error occurred while reading the Xpath (" + xpath + ")"; + LOG.error(msg, e); + throw new MalformedConfigurationFileException(msg, e); + } + + } + + public static void validate(final OMElement omElement, final File schemaFile) throws SAXException, IOException { + + Element sourceElement; + + // if the OMElement is created using DOM implementation use it + if (omElement instanceof ElementImpl) { + sourceElement = (Element) omElement; + } else { // else convert from llom to dom + sourceElement = getDOMElement(omElement); + } + + // Create a SchemaFactory capable of understanding WXS schemas. + + // Load a WXS schema, represented by a Schema instance. + SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + Source source = new StreamSource(schemaFile); + + // Create a Validator object, which can be used to validate + // an instance document. + Schema schema = factory.newSchema(source); + Validator validator = schema.newValidator(); + + // Validate the DOM tree. + validator.validate(new DOMSource(sourceElement)); + } + + public static String resolveSecret(final OMElement docElt, final OMElement elt) { + // retrieve the value using secure vault + SecretResolver secretResolver = SecretResolverFactory.create(docElt, false); + + String alias = elt.getAttributeValue(new QName( + CloudControllerConstants.ALIAS_NAMESPACE, + CloudControllerConstants.ALIAS_ATTRIBUTE, + CloudControllerConstants.ALIAS_ATTRIBUTE_PREFIX)); + + // retrieve the secured password + if (secretResolver != null && secretResolver.isInitialized() && + secretResolver.isTokenProtected(alias)) { + return secretResolver.resolve(alias); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8c359dc1/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java index ef37078..3808d13 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java @@ -21,15 +21,14 @@ package org.apache.stratos.cloud.controller.util; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.deployment.partition.Partition; +import org.apache.stratos.cloud.controller.domain.*; +import org.apache.stratos.cloud.controller.domain.Partition; import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.exception.InvalidIaasProviderException; -import org.apache.stratos.cloud.controller.interfaces.Iaas; -import org.apache.stratos.cloud.controller.jcloud.ComputeServiceBuilderUtil; -import org.apache.stratos.cloud.controller.persist.Deserializer; -import org.apache.stratos.cloud.controller.pojo.*; +import org.apache.stratos.cloud.controller.iaas.Iaas; +import org.apache.stratos.cloud.controller.registry.Deserializer; import org.apache.stratos.cloud.controller.registry.RegistryManager; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.context.FasterLookUpDataHolder; import org.apache.stratos.common.Property; import org.apache.stratos.messaging.domain.topology.Topology; import org.wso2.carbon.registry.core.exceptions.RegistryException;
