http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index f04a11f..d636d7e 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; @@ -53,7 +54,6 @@ import java.util.*; public class TopologyBuilder { private static final Log log = LogFactory.getLog(TopologyBuilder.class); - public static void handleServiceCreated(List<Cartridge> cartridgeList) { Service service; Topology topology = TopologyManager.getTopology(); @@ -67,7 +67,8 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { - ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; + ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : + ServiceType.SingleTenant; service = new Service(cartridge.getType(), serviceType); Properties properties = new Properties(); @@ -99,8 +100,8 @@ public class TopologyBuilder { Port port; //adding ports to the event for (PortMapping portMapping : portMappings) { - port = new Port(portMapping.getProtocol(), - portMapping.getPort(), portMapping.getProxyPort()); + port = new Port(portMapping.getProtocol(), portMapping.getPort(), + portMapping.getProxyPort()); service.addPort(port); } } @@ -138,8 +139,8 @@ public class TopologyBuilder { log.warn(String.format("Service %s does not exist..", cartridge.getType())); } } else { - log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() - + " from the topology"); + log.warn("Subscription already exists. Hence not removing the service:" + + cartridge.getType() + " from the topology"); } } } @@ -153,7 +154,7 @@ public class TopologyBuilder { Service service = topology.getService(event.getServiceName()); if (service == null) { log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); + " not found in Topology, unable to update the cluster status to Created"); return; } @@ -161,9 +162,9 @@ public class TopologyBuilder { log.warn("Cluster " + event.getClusterId() + " is already in the Topology "); return; } else { - cluster = new Cluster(event.getServiceName(), - event.getClusterId(), event.getDeploymentPolicyName(), - event.getAutosScalePolicyName(), event.getAppId()); + cluster = new Cluster(event.getServiceName(), event.getClusterId(), + event.getDeploymentPolicyName(), + event.getAutosScalePolicyName(), event.getAppId()); //cluster.setStatus(Status.Created); cluster.setHostNames(event.getHostNames()); cluster.setTenantRange(event.getTenantRange()); @@ -186,11 +187,12 @@ public class TopologyBuilder { for (Cluster cluster : appClusters) { Service service = topology.getService(cluster.getServiceName()); if (service == null) { - log.error("Service " + cluster.getServiceName() - + " not found in Topology, unable to create Application cluster"); + log.error("Service " + cluster.getServiceName() + + " not found in Topology, unable to create Application cluster"); } else { service.addCluster(cluster); - log.info("Application Cluster " + cluster.getClusterId() + " created in CC topology"); + log.info("Application Cluster " + cluster.getClusterId() + + " created in CC topology"); } } TopologyManager.updateTopology(topology); @@ -199,17 +201,19 @@ public class TopologyBuilder { } log.debug("Creating cluster port mappings: [appication-id] " + appId); - for(Cluster cluster : appClusters) { + for (Cluster cluster : appClusters) { String cartridgeType = cluster.getServiceName(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - if(cartridge == null) { - throw new CloudControllerException("Cartridge not found: [cartridge-type] " + cartridgeType); + if (cartridge == null) { + throw new CloudControllerException( + "Cartridge not found: [cartridge-type] " + cartridgeType); } - for(PortMapping portMapping : cartridge.getPortMappings()) { - ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appId, - cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(), - portMapping.getProxyPort()); + for (PortMapping portMapping : cartridge.getPortMappings()) { + ClusterPortMapping clusterPortMapping = + new ClusterPortMapping(appId, cluster.getClusterId(), portMapping.getName(), + portMapping.getProtocol(), portMapping.getPort(), + portMapping.getProxyPort()); CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping); log.debug("Cluster port mapping created: " + clusterPortMapping.toString()); } @@ -239,13 +243,13 @@ public class TopologyBuilder { removedClusters.add(aService.removeCluster(aClusterData.getClusterId())); } else { log.warn("Service " + aClusterData.getServiceType() + " not found, " + - "unable to remove Cluster " + aClusterData.getClusterId()); + "unable to remove Cluster " + aClusterData.getClusterId()); } // remove runtime data context.removeClusterContext(aClusterData.getClusterId()); log.info("Removed application [ " + appId + " ]'s Cluster " + - "[ " + aClusterData.getClusterId() + " ] from the topology"); + "[ " + aClusterData.getClusterId() + " ] from the topology"); } // persist runtime data changes CloudControllerContext.getInstance().persist(); @@ -275,22 +279,23 @@ public class TopologyBuilder { Service service = topology.getService(event.getServiceName()); if (service == null) { log.error("Service " + event.getServiceName() + - " not found in Topology, unable to update the cluster status to Created"); + " not found in Topology, unable to update the cluster status to Created"); return; } Cluster cluster = service.getCluster(event.getClusterId()); if (cluster == null) { - log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " + - "status to Created"); + log.error("Cluster " + event.getClusterId() + + " not found in Topology, unable to update " + + "status to Created"); return; } ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); if (context == null) { log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); return; } ClusterStatus status = ClusterStatus.Created; @@ -299,25 +304,25 @@ public class TopologyBuilder { log.info("Cluster Created adding status started for" + cluster.getClusterId()); TopologyManager.updateTopology(topology); //publishing data - TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(), - event.getClusterId(), event.getInstanceId()); + TopologyEventPublisher + .sendClusterResetEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), event.getInstanceId()); } else { log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); } } finally { TopologyManager.releaseWriteLock(); } - } public static void handleClusterInstanceCreated(String serviceType, String clusterId, - String alias, String instanceId, String partitionId, - String networkPartitionId) { + String alias, String instanceId, + String partitionId, String networkPartitionId) { TopologyManager.acquireWriteLock(); @@ -326,20 +331,20 @@ public class TopologyBuilder { Service service = topology.getService(serviceType); if (service == null) { log.error("Service " + serviceType + - " not found in Topology, unable to update the cluster status to Created"); + " not found in Topology, unable to update the cluster status to Created"); return; } Cluster cluster = service.getCluster(clusterId); if (cluster == null) { log.error("Cluster " + clusterId + " not found in Topology, unable to update " + - "status to Created"); + "status to Created"); return; } if (cluster.getInstanceContexts(instanceId) != null) { log.warn("The Instance context for the cluster already exists for [cluster] " + - clusterId + " [instance-id] " + instanceId); + clusterId + " [instance-id] " + instanceId); return; } @@ -350,8 +355,7 @@ public class TopologyBuilder { TopologyManager.updateTopology(topology); ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - new ClusterInstanceCreatedEvent(serviceType, clusterId, - clusterInstance); + new ClusterInstanceCreatedEvent(serviceType, clusterId, clusterInstance); clusterInstanceCreatedEvent.setPartitionId(partitionId); TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent); @@ -360,21 +364,18 @@ public class TopologyBuilder { } } - public static void handleClusterRemoved(ClusterContext ctxt) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(ctxt.getCartridgeType()); String deploymentPolicy; if (service == null) { - log.warn(String.format("Service %s does not exist", - ctxt.getCartridgeType())); + log.warn(String.format("Service %s does not exist", ctxt.getCartridgeType())); return; } if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", - ctxt.getClusterId(), - ctxt.getCartridgeType())); + log.warn(String.format("Cluster %s does not exist for service %s", ctxt.getClusterId(), + ctxt.getCartridgeType())); return; } @@ -407,6 +408,11 @@ public class TopologyBuilder { String lbClusterId = memberContext.getLbClusterId(); long initTime = memberContext.getInitTime(); + String autoscalingReason = + memberContext.getProperties().getProperty(StratosConstants.SCALING_REASON) + .getValue(); + long scalingTime = Long.parseLong( + memberContext.getProperties().getProperty(StratosConstants.SCALING_TIME).getValue()); if (cluster.memberExists(memberId)) { log.warn(String.format("Member %s already exists", memberId)); return; @@ -414,13 +420,24 @@ public class TopologyBuilder { try { TopologyManager.acquireWriteLock(); - Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, - networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime); + Member member = + new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId, + networkPartitionId, partitionId, + memberContext.getLoadBalancingIPType(), initTime); member.setStatus(MemberStatus.Created); member.setLbClusterId(lbClusterId); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); + member.setProperties( + CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); cluster.addMember(member); TopologyManager.updateTopology(topology); + Long timeStamp = System.currentTimeMillis(); + //publishing to BAM + BAMUsageDataPublisher + .publish(memberContext.getMemberId(), memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), memberContext.getClusterId(), + memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), + MemberStatus.Created.toString(), timeStamp, autoscalingReason, + scalingTime, null); } finally { TopologyManager.releaseWriteLock(); } @@ -437,22 +454,19 @@ public class TopologyBuilder { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(memberContext.getCartridgeType()); if (service == null) { - log.warn(String.format("Service %s does not exist", - memberContext.getCartridgeType())); + log.warn(String.format("Service %s does not exist", memberContext.getCartridgeType())); return; } if (!service.clusterExists(memberContext.getClusterId())) { log.warn(String.format("Cluster %s does not exist in service %s", - memberContext.getClusterId(), - memberContext.getCartridgeType())); + memberContext.getClusterId(), memberContext.getCartridgeType())); return; } Member member = service.getCluster(memberContext.getClusterId()). getMember(memberContext.getMemberId()); if (member == null) { - log.warn(String.format("Member %s does not exist", - memberContext.getMemberId())); + log.warn(String.format("Member %s does not exist", memberContext.getMemberId())); return; } @@ -472,38 +486,39 @@ public class TopologyBuilder { // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Initialized)) { log.error("Invalid state transition from " + member.getStatus() + " to " + - MemberStatus.Initialized); + MemberStatus.Initialized); return; } else { member.setStatus(MemberStatus.Initialized); log.info("Member status updated to initialized"); TopologyManager.updateTopology(topology); - + Long timeStamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing data - BAMUsageDataPublisher.publish(memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - null); + BAMUsageDataPublisher + .publish(memberContext.getMemberId(), memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + MemberStatus.Initialized.toString(), timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); } } - private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices, + private static int findKubernetesServicePort(String clusterId, + List<KubernetesService> kubernetesServices, PortMapping portMapping) { for (KubernetesService kubernetesService : kubernetesServices) { if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) { return kubernetesService.getPort(); } } - throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " - + portMapping.getPort()); + throw new RuntimeException( + "Kubernetes service port not found: [cluster-id] " + clusterId + " [port] " + + portMapping.getPort()); } public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { @@ -512,13 +527,13 @@ public class TopologyBuilder { Service service = topology.getService(instanceStartedEvent.getServiceName()); if (service == null) { log.warn(String.format("Service %s does not exist", - instanceStartedEvent.getServiceName())); + instanceStartedEvent.getServiceName())); return; } if (!service.clusterExists(instanceStartedEvent.getClusterId())) { log.warn(String.format("Cluster %s does not exist in service %s", - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName())); + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName())); return; } @@ -526,7 +541,7 @@ public class TopologyBuilder { Member member = cluster.getMember(instanceStartedEvent.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", - instanceStartedEvent.getMemberId())); + instanceStartedEvent.getMemberId())); return; } @@ -535,30 +550,33 @@ public class TopologyBuilder { // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Starting)) { log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.Starting); + MemberStatus.Starting); return; } else { member.setStatus(MemberStatus.Starting); log.info("member started event adding status started"); TopologyManager.updateTopology(topology); + Long timeStamp = System.currentTimeMillis(); //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing data BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), - null); + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getClusterInstanceId(), + instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), timeStamp, null, + null, null); } } finally { TopologyManager.releaseWriteLock(); } } catch (Exception e) { - String message = String.format("Could not handle member started event: [application-id] %s " + - "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), + String message = String.format( + "Could not handle member started event: [application-id] %s " + + "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(), instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId()); log.warn(message, e); } @@ -569,31 +587,31 @@ public class TopologyBuilder { Service service = topology.getService(instanceActivatedEvent.getServiceName()); if (service == null) { log.warn(String.format("Service %s does not exist", - instanceActivatedEvent.getServiceName())); + instanceActivatedEvent.getServiceName())); return; } Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceActivatedEvent.getClusterId())); + instanceActivatedEvent.getClusterId())); return; } Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", - instanceActivatedEvent.getMemberId())); + instanceActivatedEvent.getMemberId())); return; } - MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent( - instanceActivatedEvent.getServiceName(), - instanceActivatedEvent.getClusterId(), - instanceActivatedEvent.getClusterInstanceId(), - instanceActivatedEvent.getMemberId(), - instanceActivatedEvent.getNetworkPartitionId(), - instanceActivatedEvent.getPartitionId()); + MemberActivatedEvent memberActivatedEvent = + new MemberActivatedEvent(instanceActivatedEvent.getServiceName(), + instanceActivatedEvent.getClusterId(), + instanceActivatedEvent.getClusterInstanceId(), + instanceActivatedEvent.getMemberId(), + instanceActivatedEvent.getNetworkPartitionId(), + instanceActivatedEvent.getPartitionId()); // grouping - set grouid //TODO @@ -602,39 +620,48 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); return; } else { member.setStatus(MemberStatus.Active); // Set member ports try { - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceName()); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge( + service.getServiceName()); if (cartridge == null) { - throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s", - service.getServiceName())); + throw new RuntimeException( + String.format("Cartridge not found: [cartridge-type] %s", + service.getServiceName())); } Port port; int portValue; List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings()); String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); + ClusterContext clusterContext = + CloudControllerContext.getInstance().getClusterContext(clusterId); + List<KubernetesService> kubernetesServices = + clusterContext.getKubernetesServices(); for (PortMapping portMapping : portMappings) { if (kubernetesServices != null) { - portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping); + portValue = findKubernetesServicePort(clusterId, kubernetesServices, + portMapping); } else { portValue = portMapping.getPort(); } - port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort()); + port = new Port(portMapping.getProtocol(), portValue, + portMapping.getProxyPort()); member.addPort(port); memberActivatedEvent.addPort(port); } } catch (Exception e) { - String message = String.format("Could not add member ports: [service-name] %s [member-id] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId()); + String message = String.format( + "Could not add member ports: [service-name] %s [member-id] %s", + memberActivatedEvent.getServiceName(), + memberActivatedEvent.getMemberId()); log.error(message, e); } @@ -644,122 +671,126 @@ public class TopologyBuilder { memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); TopologyManager.updateTopology(topology); - + Long timeStamp = System.currentTimeMillis(); // Publish member activated event TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); // Publish statistics data BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Active.toString(), - null); + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getClusterInstanceId(), + memberActivatedEvent.getServiceName(), + MemberStatus.Active.toString(), timeStamp, null, null, + null); } } finally { TopologyManager.releaseWriteLock(); } } - public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) + public static void handleMemberReadyToShutdown( + InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) throws InvalidMemberException, InvalidCartridgeTypeException { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); //update the status of the member if (service == null) { log.warn(String.format("Service %s does not exist", - instanceReadyToShutdownEvent.getServiceName())); + instanceReadyToShutdownEvent.getServiceName())); return; } Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceReadyToShutdownEvent.getClusterId())); + instanceReadyToShutdownEvent.getClusterId())); return; } - + Long timeStamp = null; Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", - instanceReadyToShutdownEvent.getMemberId())); + instanceReadyToShutdownEvent.getMemberId())); return; } - MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( - instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getClusterInstanceId(), - instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId()); + MemberReadyToShutdownEvent memberReadyToShutdownEvent = + new MemberReadyToShutdownEvent(instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getMemberId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId()); try { TopologyManager.acquireWriteLock(); if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) { log.error("Invalid State Transition from " + member.getStatus() + " to " + - MemberStatus.ReadyToShutDown); + MemberStatus.ReadyToShutDown); return; } member.setStatus(MemberStatus.ReadyToShutDown); log.info("Member Ready to shut down event adding status started"); TopologyManager.updateTopology(topology); + timeStamp = System.currentTimeMillis(); } finally { TopologyManager.releaseWriteLock(); } TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); //publishing data BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), + instanceReadyToShutdownEvent.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), timeStamp, null, + null, null); //termination of particular instance will be handled by autoscaler } - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + public static void handleMemberMaintenance( + InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) throws InvalidMemberException, InvalidCartridgeTypeException { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); //update the status of the member if (service == null) { log.warn(String.format("Service %s does not exist", - instanceMaintenanceModeEvent.getServiceName())); + instanceMaintenanceModeEvent.getServiceName())); return; } Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceMaintenanceModeEvent.getClusterId())); + instanceMaintenanceModeEvent.getClusterId())); return; } Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId()); if (member == null) { log.warn(String.format("Member %s does not exist", - instanceMaintenanceModeEvent.getMemberId())); + instanceMaintenanceModeEvent.getMemberId())); return; } - - MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( - instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getClusterInstanceId(), - instanceMaintenanceModeEvent.getMemberId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId()); + MemberMaintenanceModeEvent memberMaintenanceModeEvent = + new MemberMaintenanceModeEvent(instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getClusterInstanceId(), + instanceMaintenanceModeEvent.getMemberId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId()); try { TopologyManager.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) { - log.error("Invalid State Transition from " + member.getStatus() + " to " - + MemberStatus.In_Maintenance); + log.error("Invalid State Transition from " + member.getStatus() + " to " + + MemberStatus.In_Maintenance); return; } member.setStatus(MemberStatus.In_Maintenance); @@ -790,21 +821,18 @@ public class TopologyBuilder { Service service = topology.getService(serviceName); Properties properties; if (service == null) { - log.warn(String.format("Service %s does not exist", - serviceName)); + log.warn(String.format("Service %s does not exist", serviceName)); return; } Cluster cluster = service.getCluster(clusterId); if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - clusterId)); + log.warn(String.format("Cluster %s does not exist", clusterId)); return; } Member member = cluster.getMember(memberId); if (member == null) { - log.warn(String.format("Member %s does not exist", - memberId)); + log.warn(String.format("Member %s does not exist", memberId)); return; } @@ -818,11 +846,11 @@ public class TopologyBuilder { } finally { TopologyManager.releaseWriteLock(); } - /* @TODO leftover from grouping_poc*/ + /* @TODO leftover from grouping_poc*/ String groupAlias = null; - TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId, - clusterInstanceId, networkPartitionId, - partitionId, properties, groupAlias); + TopologyEventPublisher + .sendMemberTerminatedEvent(serviceName, clusterId, memberId, clusterInstanceId, + networkPartitionId, partitionId, properties, groupAlias); } public static void handleMemberSuspended() { @@ -834,48 +862,52 @@ public class TopologyBuilder { } } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { + public static void handleClusterActivatedEvent( + ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); //update the status of the cluster if (service == null) { log.warn(String.format("Service %s does not exist", - clusterStatusClusterActivatedEvent.getServiceName())); + clusterStatusClusterActivatedEvent.getServiceName())); return; } Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - clusterStatusClusterActivatedEvent.getClusterId())); + clusterStatusClusterActivatedEvent.getClusterId())); return; } String clusterId = cluster.getClusterId(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); + ClusterContext clusterContext = + CloudControllerContext.getInstance().getClusterContext(clusterId); if (clusterContext == null) { log.warn("Cluster context not found: [cluster-id] " + clusterId); return; } ClusterInstanceActivatedEvent clusterInstanceActivatedEvent = - new ClusterInstanceActivatedEvent( - clusterStatusClusterActivatedEvent.getAppId(), - clusterStatusClusterActivatedEvent.getServiceName(), - clusterStatusClusterActivatedEvent.getClusterId(), - clusterStatusClusterActivatedEvent.getInstanceId()); + new ClusterInstanceActivatedEvent(clusterStatusClusterActivatedEvent.getAppId(), + clusterStatusClusterActivatedEvent + .getServiceName(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent + .getInstanceId()); try { TopologyManager.acquireWriteLock(); List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(); cluster.setKubernetesServices(kubernetesServices); clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices); - ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); + ClusterInstance context = + cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId()); if (context == null) { log.warn("Cluster instance context is not found for [cluster] " + - clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + - clusterStatusClusterActivatedEvent.getInstanceId()); + clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " + + clusterStatusClusterActivatedEvent.getInstanceId()); return; } ClusterStatus status = ClusterStatus.Active; @@ -887,9 +919,10 @@ public class TopologyBuilder { TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent); } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), - context.getStatus(), status)); + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId(), + context.getStatus(), status)); return; } } finally { @@ -905,30 +938,30 @@ public class TopologyBuilder { //update the status of the cluster if (service == null) { log.warn(String.format("Service %s does not exist", - clusterInactivateEvent.getServiceName())); + clusterInactivateEvent.getServiceName())); return; } Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - clusterInactivateEvent.getClusterId())); + clusterInactivateEvent.getClusterId())); return; } ClusterInstanceInactivateEvent clusterInactivatedEvent1 = - new ClusterInstanceInactivateEvent( - clusterInactivateEvent.getAppId(), - clusterInactivateEvent.getServiceName(), - clusterInactivateEvent.getClusterId(), - clusterInactivateEvent.getInstanceId()); + new ClusterInstanceInactivateEvent(clusterInactivateEvent.getAppId(), + clusterInactivateEvent.getServiceName(), + clusterInactivateEvent.getClusterId(), + clusterInactivateEvent.getInstanceId()); try { TopologyManager.acquireWriteLock(); - ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); + ClusterInstance context = + cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId()); if (context == null) { log.warn("Cluster Instance Context is not found for [cluster] " + - clusterInactivateEvent.getClusterId() + " [instance-id] " + - clusterInactivateEvent.getInstanceId()); + clusterInactivateEvent.getClusterId() + " [instance-id] " + + clusterInactivateEvent.getInstanceId()); return; } ClusterStatus status = ClusterStatus.Inactive; @@ -940,9 +973,10 @@ public class TopologyBuilder { TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1); } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(), - context.getStatus(), status)); + " [instance-id] %s [current-status] %s [status-requested] %s", + clusterInactivateEvent.getClusterId(), + clusterInactivateEvent.getInstanceId(), context.getStatus(), + status)); return; } } finally { @@ -950,13 +984,15 @@ public class TopologyBuilder { } } - - private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) { + private static void deleteAppResourcesFromMetadataService( + ApplicationInstanceTerminatedEvent event) { try { MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient(); metadataClient.deleteApplicationProperties(event.getAppId()); } catch (Exception e) { - log.error("Error occurred while deleting the application resources frm metadata service ", e); + log.error( + "Error occurred while deleting the application resources frm metadata service ", + e); } } @@ -970,49 +1006,49 @@ public class TopologyBuilder { //update the status of the cluster if (service == null) { - log.warn(String.format("Service %s does not exist", - event.getServiceName())); + log.warn(String.format("Service %s does not exist", event.getServiceName())); return; } Cluster cluster = service.getCluster(event.getClusterId()); if (cluster == null) { - log.warn(String.format("Cluster %s does not exist", - event.getClusterId())); + log.warn(String.format("Cluster %s does not exist", event.getClusterId())); return; } ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); if (context == null) { log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); return; } ClusterStatus status = ClusterStatus.Terminated; if (context.isStateTransitionValid(status)) { context.setStatus(status); - log.info("Cluster Terminated adding status started for and removing the cluster instance" - + cluster.getClusterId()); + log.info( + "Cluster Terminated adding status started for and removing the cluster instance" + + cluster.getClusterId()); cluster.removeInstanceContext(event.getInstanceId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatedEvent clusterTerminatedEvent = + new ClusterInstanceTerminatedEvent(event.getAppId(), event.getServiceName(), + event.getClusterId(), + event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); return; } } finally { TopologyManager.releaseWriteLock(); } - } public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) { @@ -1025,14 +1061,16 @@ public class TopologyBuilder { getCluster(event.getClusterId()); if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) { - log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " + + log.error( + "Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + + " to " + ClusterStatus.Terminating); } ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId()); if (context == null) { log.warn("Cluster Instance Context is not found for [cluster] " + - event.getClusterId() + " [instance-id] " + - event.getInstanceId()); + event.getClusterId() + " [instance-id] " + + event.getInstanceId()); return; } ClusterStatus status = ClusterStatus.Terminating; @@ -1041,22 +1079,26 @@ public class TopologyBuilder { log.info("Cluster Terminating started for " + cluster.getClusterId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatingEvent clusterTerminaingEvent = + new ClusterInstanceTerminatingEvent(event.getAppId(), + event.getServiceName(), + event.getClusterId(), + event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); // Remove kubernetes services if available - ClusterContext clusterContext = - CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); - if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { + ClusterContext clusterContext = CloudControllerContext.getInstance() + .getClusterContext( + event.getClusterId()); + if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); } } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + - " [instance-id] %s [current-status] %s [status-requested] %s", - event.getClusterId(), event.getInstanceId(), - context.getStatus(), status)); + " [instance-id] %s [current-status] %s [status-requested] %s", + event.getClusterId(), event.getInstanceId(), + context.getStatus(), status)); } } finally { TopologyManager.releaseWriteLock();
http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 4d51cc1..70cd79b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -447,13 +447,14 @@ public class CloudControllerServiceImpl implements CloudControllerService { clusterContext.setVolumes(volumes); } - // Handle member created event - TopologyBuilder.handleMemberCreatedEvent(memberContext); - // Persist member context CloudControllerContext.getInstance().addMemberContext(memberContext); CloudControllerContext.getInstance().persist(); + // Handle member created event + TopologyBuilder.handleMemberCreatedEvent(memberContext); + + // Start instance in a new thread if (log.isDebugEnabled()) { log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " + http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java index 37580eb..97c3ed1 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java @@ -66,14 +66,15 @@ public class CloudControllerServiceUtil { memberContext.getClusterId(), memberContext.getNetworkPartitionId(), partitionId, memberContext.getMemberId()); + Long timeStamp = System.currentTimeMillis(); // Publish statistics to BAM BAMUsageDataPublisher.publish(memberContext.getMemberId(), - partitionId, - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Terminated.toString(), - null); + partitionId, + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(),memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + MemberStatus.Terminated.toString(),timeStamp, + null,null,null); // Remove member context CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId()); http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java index 77cfea2..3730a9d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java @@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; -import org.apache.stratos.messaging.domain.topology.MemberStatus; import java.util.concurrent.locks.Lock; @@ -37,114 +35,115 @@ import java.util.concurrent.locks.Lock; */ public class InstanceCreator implements Runnable { - private static final Log log = LogFactory.getLog(InstanceCreator.class); - - private MemberContext memberContext; - private IaasProvider iaasProvider; - private byte[] payload; - - public InstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, byte[] payload) { - this.memberContext = memberContext; - this.iaasProvider = iaasProvider; - this.payload = payload; - } - - @Override - public void run() { - Lock lock = null; - try { - lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); - - String clusterId = memberContext.getClusterId(); - Partition partition = memberContext.getPartition(); - ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); - Iaas iaas = iaasProvider.getIaas(); - - if (log.isDebugEnabled()) { - - log.debug(String.format("Payload passed to instance created, [member] %s [payload] %s", - memberContext.getMemberId(), new String(payload))); - } - memberContext = startInstance(iaas, memberContext, payload); - - if (log.isInfoEnabled()) { - log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " + - "[default-private-ip] %s [default-public-ip] %s", - memberContext.getCartridgeType(), memberContext.getClusterId(), - memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(), - memberContext.getDefaultPublicIP())); - } - - if (clusterContext.isVolumeRequired()) { - attachVolumes(iaas, clusterContext, memberContext); - } - - // Allocate IP addresses - iaas.allocateIpAddresses(clusterId, memberContext, partition); - - // Update topology - TopologyBuilder.handleMemberInitializedEvent(memberContext); - - // Publish instance creation statistics to BAM - BAMUsageDataPublisher.publish( - memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - memberContext.getInstanceMetadata()); - } catch (Exception e) { - String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s", - memberContext.getCartridgeType(), memberContext.getClusterId()); - log.error(message, e); - } finally { - if (lock != null) { - CloudControllerContext.getInstance().releaseWriteLock(lock); - } - } - } - - private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException { - memberContext = iaas.startInstance(memberContext, payload); - - // Validate instance id - String instanceId = memberContext.getInstanceId(); - if (StringUtils.isBlank(instanceId)) { - String msg = String.format("Instance id not found in started member: [cartridge-type] %s [member-id] %s", - memberContext.getCartridgeType(), memberContext.getMemberId()); - log.error(msg); - throw new IllegalStateException(msg); - } - - // Update member context and persist changes - CloudControllerContext.getInstance().updateMemberContext(memberContext); - CloudControllerContext.getInstance().persist(); - - if (log.isDebugEnabled()) { - log.debug(String.format("Member context updated: [application] %s [cartridge] %s [member] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId())); - } - - return memberContext; - } - - public void attachVolumes(Iaas iaas, ClusterContext clusterContext, MemberContext memberContext) { - // attach volumes - if (clusterContext.isVolumeRequired()) { - // remove region prefix - if (clusterContext.getVolumes() != null) { - for (Volume volume : clusterContext.getVolumes()) { - try { - iaas.attachVolume(memberContext.getInstanceId(), volume.getId(), volume.getDevice()); - } catch (Exception e) { - // continue without throwing an exception, since - // there is an instance already running - log.error(String.format("Could not attach volume, [instance] %s [volume] %s ", - memberContext.getInstanceId(), volume.toString()), e); - } - } - } - } - } + private static final Log log = LogFactory.getLog(InstanceCreator.class); + + private MemberContext memberContext; + private IaasProvider iaasProvider; + private byte[] payload; + + public InstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, byte[] payload) { + this.memberContext = memberContext; + this.iaasProvider = iaasProvider; + this.payload = payload; + } + + @Override public void run() { + Lock lock = null; + try { + lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); + + String clusterId = memberContext.getClusterId(); + Partition partition = memberContext.getPartition(); + ClusterContext clusterContext = + CloudControllerContext.getInstance().getClusterContext(clusterId); + Iaas iaas = iaasProvider.getIaas(); + + if (log.isDebugEnabled()) { + + log.debug(String.format( + "Payload passed to instance created, [member] %s [payload] %s", + memberContext.getMemberId(), new String(payload))); + } + memberContext = startInstance(iaas, memberContext, payload); + + if (log.isInfoEnabled()) { + log.info(String.format( + "Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " + + "[default-private-ip] %s [default-public-ip] %s", + memberContext.getCartridgeType(), memberContext.getClusterId(), + memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(), + memberContext.getDefaultPublicIP())); + } + + if (clusterContext.isVolumeRequired()) { + attachVolumes(iaas, clusterContext, memberContext); + } + + // Allocate IP addresses + iaas.allocateIpAddresses(clusterId, memberContext, partition); + + // Update topology + TopologyBuilder.handleMemberInitializedEvent(memberContext); + + } catch (Exception e) { + String message = + String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s", + memberContext.getCartridgeType(), memberContext.getClusterId()); + log.error(message, e); + } finally { + if (lock != null) { + CloudControllerContext.getInstance().releaseWriteLock(lock); + } + } + } + + private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) + throws CartridgeNotFoundException { + memberContext = iaas.startInstance(memberContext, payload); + + // Validate instance id + String instanceId = memberContext.getInstanceId(); + if (StringUtils.isBlank(instanceId)) { + String msg = String.format( + "Instance id not found in started member: [cartridge-type] %s [member-id] %s", + memberContext.getCartridgeType(), memberContext.getMemberId()); + log.error(msg); + throw new IllegalStateException(msg); + } + + // Update member context and persist changes + CloudControllerContext.getInstance().updateMemberContext(memberContext); + CloudControllerContext.getInstance().persist(); + + if (log.isDebugEnabled()) { + log.debug(String.format( + "Member context updated: [application] %s [cartridge] %s [member] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), + memberContext.getMemberId())); + } + + return memberContext; + } + + public void attachVolumes(Iaas iaas, ClusterContext clusterContext, + MemberContext memberContext) { + // attach volumes + if (clusterContext.isVolumeRequired()) { + // remove region prefix + if (clusterContext.getVolumes() != null) { + for (Volume volume : clusterContext.getVolumes()) { + try { + iaas.attachVolume(memberContext.getInstanceId(), volume.getId(), + volume.getDevice()); + } catch (Exception e) { + // continue without throwing an exception, since + // there is an instance already running + log.error( + String.format("Could not attach volume, [instance] %s [volume] %s ", + memberContext.getInstanceId(), volume.toString()), e); + } + } + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java index d5aabbd..26106d7 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java @@ -56,8 +56,12 @@ public class BAMUsageDataPublisher { String partitionId, String networkId, String clusterId, + String clusterInstanceId, String serviceName, String status, + Long timeStamp, + String autoscalingReason, + Long scalingTime, InstanceMetadata metadata) { if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) { return; @@ -75,20 +79,24 @@ public class BAMUsageDataPublisher { return; } } - MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); String cartridgeType = memberContext.getCartridgeType(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - + String instanceType = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType, partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE); //Construct the data to be published List<Object> payload = new ArrayList<Object>(); // Payload values + payload.add(timeStamp); payload.add(memberId); payload.add(serviceName); payload.add(clusterId); + payload.add(clusterInstanceId); payload.add(handleNull(memberContext.getLbClusterId())); payload.add(handleNull(partitionId)); payload.add(handleNull(networkId)); + payload.add(handleNull(instanceType)); + payload.add(handleNull(autoscalingReason)); + payload.add(handleNull(scalingTime)); if (cartridge != null) { payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); } else { @@ -151,12 +159,17 @@ public class BAMUsageDataPublisher { streamDefinition.setDescription("Instances booted up by the Cloud Controller"); // Payload definition List<Attribute> payloadData = new ArrayList<Attribute>(); + payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); @@ -198,7 +211,7 @@ public class BAMUsageDataPublisher { dataPublisher.addStreamDefinition(streamDefinition); } catch (Exception e) { String msg = "Unable to create a data publisher to " + bamServerUrl + - ". Usage Agent will not function properly. "; + ". Usage Agent will not function properly. "; log.error(msg, e); throw new CloudControllerException(msg, e); } @@ -210,4 +223,11 @@ public class BAMUsageDataPublisher { } return val; } + + private static Long handleNull(Long val) { + if (val == null) { + return -1L; + } + return val; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/2fd289b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java index 5e6115f..5730475 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java @@ -103,6 +103,7 @@ public final class CloudControllerConstants { public static final String MEMBER_ID_COL = "memberId"; public static final String CARTRIDGE_TYPE_COL = "cartridgeType"; public static final String CLUSTER_ID_COL = "clusterId"; + public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId"; public static final String PARTITION_ID_COL = "partitionId"; public static final String NETWORK_ID_COL = "networkId"; public static final String ALIAS_COL = "alias"; @@ -122,6 +123,9 @@ public final class CloudControllerConstants { public static final String PRIV_IP_COL = "privateIPAddresses"; public static final String PUB_IP_COL = "publicIPAddresses"; public static final String ALLOCATE_IP_COL = "allocateIPAddresses"; + public static final String TIME_STAMP = "timeStamp"; + public static final String SCALING_REASON = "scalingReason"; + public static final String SCALING_TIME = "scalingTime"; /** * Properties @@ -148,59 +152,59 @@ public final class CloudControllerConstants { * XPath expressions */ public static final String IAAS_PROVIDER_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" + IAAS_PROVIDERS_ELEMENT + "/" - + IAAS_PROVIDER_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + IAAS_PROVIDERS_ELEMENT + "/" + + IAAS_PROVIDER_ELEMENT; public static final String PARTITION_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT - + "/" + PARTITIONS_ELEMENT + "/" + PARTITION_ELEMENT; + + "/" + PARTITIONS_ELEMENT + "/" + PARTITION_ELEMENT; public static final String REGION_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT - + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" - + REGION_ELEMENT; + + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" + + REGION_ELEMENT; public static final String ZONE_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT - + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" - + REGION_ELEMENT + "/" + ZONE_ELEMENT; + + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" + + REGION_ELEMENT + "/" + ZONE_ELEMENT; public static final String HOST_XPATH = "/" + CLOUD_CONTROLLER_ELEMENT - + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" - + REGION_ELEMENT + "/" + ZONE_ELEMENT + "/" + HOST_ELEMENT; + + "/" + IAAS_PROVIDERS_ELEMENT + "/" + IAAS_PROVIDER_ELEMENT + "/" + + REGION_ELEMENT + "/" + ZONE_ELEMENT + "/" + HOST_ELEMENT; public static final String PROPERTY_ELEMENT_XPATH = "/" + PROPERTY_ELEMENT; public static final String IMAGE_ID_ELEMENT_XPATH = "/" + IMAGE_ID_ELEMENT; public static final String SCALE_UP_ORDER_ELEMENT_XPATH = "/" - + SCALE_UP_ORDER_ELEMENT; + + SCALE_UP_ORDER_ELEMENT; public static final String SCALE_DOWN_ORDER_ELEMENT_XPATH = "/" - + SCALE_DOWN_ORDER_ELEMENT; + + SCALE_DOWN_ORDER_ELEMENT; public static final String PROVIDER_ELEMENT_XPATH = "/" + PROPERTY_ELEMENT; public static final String IDENTITY_ELEMENT_XPATH = "/" + IDENTITY_ELEMENT; public static final String CREDENTIAL_ELEMENT_XPATH = "/" - + CREDENTIAL_ELEMENT; + + CREDENTIAL_ELEMENT; public static final String SERVICES_ELEMENT_XPATH = "/" + SERVICES_ELEMENT - + "/" + SERVICE_ELEMENT; + + "/" + SERVICE_ELEMENT; public static final String SERVICE_ELEMENT_XPATH = "/" + SERVICE_ELEMENT; public static final String CARTRIDGE_ELEMENT_XPATH = "/" - + CARTRIDGE_ELEMENT; + + CARTRIDGE_ELEMENT; public static final String PAYLOAD_ELEMENT_XPATH = "/" + PAYLOAD_ELEMENT; public static final String HOST_ELEMENT_XPATH = "/" + HOST_ELEMENT; public static final String CARTRIDGES_ELEMENT_XPATH = "/" - + CARTRIDGES_ELEMENT + "/" + CARTRIDGE_ELEMENT; + + CARTRIDGES_ELEMENT + "/" + CARTRIDGE_ELEMENT; public static final String IAAS_PROVIDER_ELEMENT_XPATH = "/" - + IAAS_PROVIDER_ELEMENT; + + IAAS_PROVIDER_ELEMENT; public static final String DEPLOYMENT_ELEMENT_XPATH = "/" - + DEPLOYMENT_ELEMENT; + + DEPLOYMENT_ELEMENT; public static final String PORT_MAPPING_ELEMENT_XPATH = "/" - + PORT_MAPPING_ELEMENT; + + PORT_MAPPING_ELEMENT; public static final String APP_TYPES_ELEMENT_XPATH = "/" - + APP_TYPES_ELEMENT; + + APP_TYPES_ELEMENT; public static final String DATA_PUBLISHER_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" + DATA_PUBLISHER_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + DATA_PUBLISHER_ELEMENT; public static final String TOPOLOGY_SYNC_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" + TOPOLOGY_SYNC_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + TOPOLOGY_SYNC_ELEMENT; public static final String DATA_PUBLISHER_CRON_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" + CRON_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + CRON_ELEMENT; public static final String BAM_SERVER_ADMIN_USERNAME_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" - + BAM_SERVER_ADMIN_USERNAME_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + + BAM_SERVER_ADMIN_USERNAME_ELEMENT; public static final String BAM_SERVER_ADMIN_PASSWORD_XPATH = "/" - + CLOUD_CONTROLLER_ELEMENT + "/" - + BAM_SERVER_ADMIN_PASSWORD_ELEMENT; + + CLOUD_CONTROLLER_ELEMENT + "/" + + BAM_SERVER_ADMIN_PASSWORD_ELEMENT; // public static final String CASSANDRA_HOST_ADDRESS_XPATH = // "/"+CLOUD_CONTROLLER_ELEMENT+ // "/"+CASSANDRA_HOST_ADDRESS; @@ -240,15 +244,15 @@ public final class CloudControllerConstants { * Directories */ public static final String SERVICES_DIR = CarbonUtils.getCarbonRepository() - + File.separator + "services" + File.separator; + + File.separator + "services" + File.separator; /** * Topology sync related constants */ public static final String TOPOLOGY_FILE_PATH = CarbonUtils - .getCarbonConfigDirPath() - + File.separator - + "service-topology.conf"; + .getCarbonConfigDirPath() + + File.separator + + "service-topology.conf"; public static final String TOPOLOGY_SYNC_CRON = "1 * * * * ? *"; public static final String TOPOLOGY_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK"; public static final String TOPOLOGY_SYNC_TASK_TYPE = "TOPOLOGY_SYNC_TASK_TYPE";
