http://git-wip-us.apache.org/repos/asf/stratos/blob/4b864473/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 0e5c4a5..ecd2728 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import
org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
import
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
+import org.apache.stratos.kubernetes.client.KubernetesConstants;
import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.*;
@@ -44,6 +45,9 @@ import org.apache.stratos.messaging.event.topology.*;
import
org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient;
import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.*;
/**
@@ -67,11 +71,8 @@ public class TopologyBuilder {
TopologyManager.acquireWriteLock();
for (Cartridge cartridge : cartridgeList) {
if (!topology.serviceExists(cartridge.getType())) {
-
- ServiceType serviceType = cartridge.isMultiTenant() ?
ServiceType.MultiTenant :
- ServiceType.SingleTenant;
- service = new Service(cartridge.getType(), serviceType,
cartridge.getUuid());
-
+ ServiceType serviceType = cartridge.isMultiTenant() ?
ServiceType.MultiTenant : ServiceType.SingleTenant;
+ service = new Service(cartridge.getType(), serviceType);
Properties properties = new Properties();
try {
@@ -122,26 +123,26 @@ public class TopologyBuilder {
Topology topology = TopologyManager.getTopology();
for (Cartridge cartridge : cartridgeList) {
- Service service = topology.getService(cartridge.getUuid());
+ Service service = topology.getService(cartridge.getType());
if (service == null) {
- log.warn("Cartridge does not exist [cartridge] " + cartridge);
+ log.warn("Cartridge does not exist [cartidge] " + cartridge);
return;
}
if (service.getClusters().size() == 0) {
- if (topology.serviceExists(cartridge.getUuid())) {
+ if (topology.serviceExists(cartridge.getType())) {
try {
TopologyManager.acquireWriteLock();
- topology.removeService(cartridge.getUuid());
+ topology.removeService(cartridge.getType());
TopologyManager.updateTopology(topology);
} finally {
TopologyManager.releaseWriteLock();
}
TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList);
} else {
- log.warn(String.format("Service %s does not exist..",
cartridge.getUuid()));
+ log.warn(String.format("Service %s does not exist..",
cartridge.getType()));
}
} else {
- log.warn("Subscription already exists. Hence not removing the
service:" + cartridge.getUuid()
+ log.warn("Subscription already exists. Hence not removing the
service:" + cartridge.getType()
+ " from the topology");
}
}
@@ -149,7 +150,7 @@ public class TopologyBuilder {
- public static void handleApplicationClustersCreated(String appUuid,
List<Cluster> appClusters) {
+ public static void handleApplicationClustersCreated(String appId,
List<Cluster> appClusters) {
TopologyManager.acquireWriteLock();
@@ -170,16 +171,16 @@ public class TopologyBuilder {
TopologyManager.releaseWriteLock();
}
- log.debug("Creating cluster port mappings: [application-id] " +
appUuid);
+ log.debug("Creating cluster port mappings: [appication-id] " + appId);
for (Cluster cluster : appClusters) {
- String cartridgeUuid = cluster.getServiceName();
- Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(cartridgeUuid);
+ String cartridgeType = cluster.getServiceName();
+ Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(cartridgeType);
if (cartridge == null) {
- throw new CloudControllerException("Cartridge not found:
[cartridge-uuid] " + cartridgeUuid);
+ throw new CloudControllerException("Cartridge not found:
[cartridge-type] " + cartridgeType);
}
for (PortMapping portMapping : cartridge.getPortMappings()) {
- ClusterPortMapping clusterPortMapping = new
ClusterPortMapping(appUuid,
+ ClusterPortMapping clusterPortMapping = new
ClusterPortMapping(appId,
cluster.getClusterId(), portMapping.getName(),
portMapping.getProtocol(), portMapping.getPort(),
portMapping.getProxyPort());
if (portMapping.getKubernetesPortType() != null) {
@@ -190,865 +191,891 @@ public class TopologyBuilder {
}
}
- // Persist cluster port mappings
- CloudControllerContext.getInstance().persist();
+ // Persist cluster port mappings
+ CloudControllerContext.getInstance().persist();
- // Send application clusters created event
- TopologyEventPublisher.sendApplicationClustersCreated(appUuid,
appClusters);
- }
+ // Send application clusters created event
+ TopologyEventPublisher.sendApplicationClustersCreated(appId,
appClusters);
+ }
- public static void handleApplicationClustersRemoved (String appId,
- Set < ClusterDataHolder > clusterData){
- TopologyManager.acquireWriteLock();
+ public static void handleApplicationClustersRemoved(String appId,
+ Set<ClusterDataHolder>
clusterData) {
+ TopologyManager.acquireWriteLock();
- List<Cluster> removedClusters = new ArrayList<Cluster>();
- CloudControllerContext context =
CloudControllerContext.getInstance();
- try {
- Topology topology = TopologyManager.getTopology();
-
- if (clusterData != null) {
- // remove clusters from CC topology model and remove
runtime information
- for (ClusterDataHolder aClusterData : clusterData) {
- Service aService =
topology.getService(aClusterData.getServiceUuid());
- if (aService != null) {
-
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
- } else {
- log.warn("Service " +
aClusterData.getServiceType() + " not found, " +
- "unable to remove Cluster " +
aClusterData.getClusterId());
- }
- // remove runtime data
-
context.removeClusterContext(aClusterData.getClusterId());
+ List<Cluster> removedClusters = new ArrayList<Cluster>();
+ CloudControllerContext context = CloudControllerContext.getInstance();
+ try {
+ Topology topology = TopologyManager.getTopology();
- log.info("Removed application [ " + appId + " ]'s
Cluster " +
- "[ " + aClusterData.getClusterId() + " ] from
the topology");
+ if (clusterData != null) {
+ // remove clusters from CC topology model and remove runtime
information
+ for (ClusterDataHolder aClusterData : clusterData) {
+ Service aService =
topology.getService(aClusterData.getServiceType());
+ if (aService != null) {
+
removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+ } else {
+ log.warn("Service " + aClusterData.getServiceType() +
" not found, " +
+ "unable to remove Cluster " +
aClusterData.getClusterId());
}
- // persist runtime data changes
- CloudControllerContext.getInstance().persist();
- } else {
- log.info("No cluster data found for application " + appId
+ " to remove");
+ // remove runtime data
+ context.removeClusterContext(aClusterData.getClusterId());
+
+ log.info("Removed application [ " + appId + " ]'s Cluster
" +
+ "[ " + aClusterData.getClusterId() + " ] from the
topology");
}
+ // persist runtime data changes
+ CloudControllerContext.getInstance().persist();
+ } else {
+ log.info("No cluster data found for application " + appId + "
to remove");
+ }
- TopologyManager.updateTopology(topology);
+ TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ // Remove cluster port mappings of application
+ CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+ CloudControllerContext.getInstance().persist();
+
+ TopologyEventPublisher.sendApplicationClustersRemoved(appId,
clusterData);
+
+ }
+
+ public static void handleClusterReset(ClusterStatusClusterResetEvent
event) {
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ log.error("Service " + event.getServiceName() +
+ " not found in Topology, unable to update the cluster
status to Created");
+ return;
}
- // Remove cluster port mappings of application
-
CloudControllerContext.getInstance().removeClusterPortMappings(appId);
- CloudControllerContext.getInstance().persist();
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.error("Cluster " + event.getClusterId() + " not found in
Topology, unable to update " +
+ "status to Created");
+ return;
+ }
- TopologyEventPublisher.sendApplicationClustersRemoved(appId,
clusterData);
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster]
" +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Created;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Created adding status started for" +
cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher.sendClusterResetEvent(event.getAppId(),
event.getServiceName(),
+ event.getClusterId(), event.getInstanceId());
+ } else {
+ log.warn(String.format("Cluster state transition is not valid:
[cluster-id] %s " +
+ " [instance-id] %s [current-status] %s
[status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- public static void handleClusterReset (ClusterStatusClusterResetEvent
event){
- TopologyManager.acquireWriteLock();
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the
cluster status to Created");
- return;
- }
+ }
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.error("Cluster " + event.getClusterId() + " not found
in Topology, unable to update " +
- "status to Created");
- return;
- }
+ public static void handleClusterInstanceCreated(String serviceType, String
clusterId,
+ String alias, String
instanceId, String partitionId,
+ String networkPartitionId)
{
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for
[cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Created;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Created adding status started for" +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
-
TopologyEventPublisher.sendClusterResetEvent(event.getAppId(),
event.getServiceName(),
- event.getClusterId(), event.getInstanceId());
- } else {
- log.warn(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
+ TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceType);
+ if (service == null) {
+ log.error("Service " + serviceType +
+ " not found in Topology, unable to update the cluster
status to Created");
+ return;
+ }
+
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.error("Cluster " + clusterId + " not found in Topology,
unable to update " +
+ "status to Created");
+ return;
+ }
+
+ if (cluster.getInstanceContexts(instanceId) != null) {
+ log.warn("The Instance context for the cluster already exists
for [cluster] " +
+ clusterId + " [instance-id] " + instanceId);
+ return;
}
+ ClusterInstance clusterInstance = new ClusterInstance(alias,
clusterId, instanceId);
+ clusterInstance.setNetworkPartitionId(networkPartitionId);
+ clusterInstance.setPartitionId(partitionId);
+ cluster.addInstanceContext(instanceId, clusterInstance);
+ TopologyManager.updateTopology(topology);
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ new ClusterInstanceCreatedEvent(serviceType, clusterId,
+ clusterInstance);
+ clusterInstanceCreatedEvent.setPartitionId(partitionId);
+
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
+ }
- public static void handleClusterInstanceCreated (String serviceUuid,
String clusterId,
- String alias, String instanceId, String partitionId,
- String networkPartitionUuid){
+ public static void handleClusterRemoved(ClusterContext ctxt) {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(ctxt.getCartridgeType());
+ String deploymentPolicy;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ ctxt.getCartridgeType()));
+ return;
+ }
+
+ if (!service.clusterExists(ctxt.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist for service %s",
+ ctxt.getClusterId(),
+ ctxt.getCartridgeType()));
+ return;
+ }
+
+ try {
TopologyManager.acquireWriteLock();
+ Cluster cluster = service.removeCluster(ctxt.getClusterId());
+ deploymentPolicy = cluster.getDeploymentPolicyName();
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
+ }
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceUuid);
- if (service == null) {
- log.error("Service " + serviceUuid +
- " not found in Topology, unable to update the
cluster status to Created");
- return;
- }
+ /**
+ * Add member object to the topology and publish member created event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberCreatedEvent(MemberContext memberContext) {
+ Topology topology = TopologyManager.getTopology();
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.error("Cluster " + clusterId + " not found in
Topology, unable to update " +
- "status to Created");
- return;
- }
+ Service service =
topology.getService(memberContext.getCartridgeType());
+ String clusterId = memberContext.getClusterId();
+ Cluster cluster = service.getCluster(clusterId);
+ String memberId = memberContext.getMemberId();
+ String clusterInstanceId = memberContext.getClusterInstanceId();
+ String networkPartitionId = memberContext.getNetworkPartitionId();
+ String partitionId = memberContext.getPartition().getId();
+ String lbClusterId = memberContext.getLbClusterId();
+ long initTime = memberContext.getInitTime();
+
+ if (cluster.memberExists(memberId)) {
+ log.warn(String.format("Member %s already exists", memberId));
+ return;
+ }
- if (cluster.getInstanceContexts(instanceId) != null) {
- log.warn("The Instance context for the cluster already
exists for [cluster] " +
- clusterId + " [instance-id] " + instanceId);
- return;
- }
+ try {
+ TopologyManager.acquireWriteLock();
+ Member member = new Member(service.getServiceName(), clusterId,
memberId, clusterInstanceId,
+ networkPartitionId, partitionId,
memberContext.getLoadBalancingIPType(), initTime);
+ member.setStatus(MemberStatus.Created);
+ member.setLbClusterId(lbClusterId);
+
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+ cluster.addMember(member);
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
- ClusterInstance clusterInstance = new ClusterInstance(alias,
clusterId, instanceId);
- clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
- clusterInstance.setPartitionId(partitionId);
- cluster.addInstanceContext(instanceId, clusterInstance);
- TopologyManager.updateTopology(topology);
+ TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
+ }
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
- clusterInstance);
- clusterInstanceCreatedEvent.setPartitionId(partitionId);
-
TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
+ /**
+ * Update member status to initialized and publish member initialized event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberInitializedEvent(MemberContext
memberContext) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(memberContext.getCartridgeType());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ memberContext.getCartridgeType()));
+ return;
+ }
+ if (!service.clusterExists(memberContext.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist in service %s",
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType()));
+ return;
+ }
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ Member member = service.getCluster(memberContext.getClusterId()).
+ getMember(memberContext.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ memberContext.getMemberId()));
+ return;
}
+ try {
+ TopologyManager.acquireWriteLock();
- public static void handleClusterRemoved (ClusterContext ctxt){
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(ctxt.getCartridgeUuid());
- String deploymentPolicy;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- ctxt.getCartridgeUuid()));
- return;
+ // Set ip addresses
+ member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+ if (memberContext.getPrivateIPs() != null) {
+
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+ }
+ member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+ if (memberContext.getPublicIPs() != null) {
+
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
}
- if (!service.clusterExists(ctxt.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist for service
%s",
- ctxt.getClusterId(),
- ctxt.getCartridgeUuid()));
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
+ log.error("Invalid state transition from " +
member.getStatus() + " to " +
+ MemberStatus.Initialized);
return;
- }
+ } else {
- try {
- TopologyManager.acquireWriteLock();
- Cluster cluster = service.removeCluster(ctxt.getClusterId());
- deploymentPolicy = cluster.getDeploymentPolicyName();
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendClusterRemovedEvent(ctxt,
deploymentPolicy);
- }
+ Cluster cluster =
service.getCluster(memberContext.getClusterId());
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
+ List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
- /**
- * Add member object to the topology and publish member created event
- *
- * @param memberContext
- */
- public static void handleMemberCreatedEvent (MemberContext
memberContext){
- Topology topology = TopologyManager.getTopology();
+ if (kubernetesServices != null) {
+ cluster.setKubernetesServices(kubernetesServices);
+ }
- Service service =
topology.getService(memberContext.getCartridgeType());
- String clusterId = memberContext.getClusterId();
- Cluster cluster = service.getCluster(clusterId);
- String memberId = memberContext.getMemberId();
- String clusterInstanceId = memberContext.getClusterInstanceId();
- String networkPartitionId = memberContext.getNetworkPartitionId();
- String partitionId = memberContext.getPartition().getUuid();
- String lbClusterId = memberContext.getLbClusterId();
- long initTime = memberContext.getInitTime();
+ member.setStatus(MemberStatus.Initialized);
+ log.info("Member status updated to initialized");
+ TopologyManager.updateTopology(topology);
- if (cluster.memberExists(memberId)) {
- log.warn(String.format("Member %s already exists", memberId));
- return;
+
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
+ //publishing data
+ BAMUsageDataPublisher.publish(memberContext.getMemberId(),
+ memberContext.getPartition().getId(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType(),
+ MemberStatus.Initialized.toString(),
+ null);
}
-
- try {
- TopologyManager.acquireWriteLock();
- Member member = new Member(service.getServiceName(),
clusterId, memberId, clusterInstanceId,
- networkPartitionId, partitionId,
memberContext.getLoadBalancingIPType(), initTime);
- member.setStatus(MemberStatus.Created);
- member.setLbClusterId(lbClusterId);
-
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
- cluster.addMember(member);
- TopologyManager.updateTopology(topology);
} finally {
TopologyManager.releaseWriteLock();
}
+ }
- TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
+ private static int findKubernetesServicePort(String clusterId,
List<KubernetesService> kubernetesServices,
+ PortMapping portMapping) {
+ for (KubernetesService kubernetesService : kubernetesServices) {
+ if
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+ return kubernetesService.getPort();
+ }
+ }
+ throw new RuntimeException("Kubernetes service port not found:
[cluster-id] " + clusterId + " [port] "
+ + portMapping.getPort());
}
- /**
- * Update member status to initialized and publish member initialized
event
- *
- * @param memberContext
- */
- public static void handleMemberInitializedEvent (MemberContext
memberContext){
+ public static void handleMemberStarted(InstanceStartedEvent
instanceStartedEvent) {
+ try {
Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(memberContext.getCartridgeType());
+ Service service =
topology.getService(instanceStartedEvent.getServiceName());
if (service == null) {
log.warn(String.format("Service %s does not exist",
- memberContext.getCartridgeType()));
+ instanceStartedEvent.getServiceName()));
return;
}
- if (!service.clusterExists(memberContext.getClusterId())) {
+ if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
log.warn(String.format("Cluster %s does not exist in service
%s",
- memberContext.getClusterId(),
- memberContext.getCartridgeType()));
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName()));
return;
}
- Member member = service.getCluster(memberContext.getClusterId()).
- getMember(memberContext.getMemberId());
+ Cluster cluster =
service.getCluster(instanceStartedEvent.getClusterId());
+ Member member =
cluster.getMember(instanceStartedEvent.getMemberId());
if (member == null) {
log.warn(String.format("Member %s does not exist",
- memberContext.getMemberId()));
+ instanceStartedEvent.getMemberId()));
return;
}
try {
TopologyManager.acquireWriteLock();
-
- // Set ip addresses
-
member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
- if (memberContext.getPrivateIPs() != null) {
-
member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
- }
- member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
- if (memberContext.getPublicIPs() != null) {
-
member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
- }
-
// try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
- log.error("Invalid state transition from " +
member.getStatus() + " to " +
- MemberStatus.Initialized);
+ if (!member.isStateTransitionValid(MemberStatus.Starting)) {
+ log.error("Invalid State Transition from " +
member.getStatus() + " to " +
+ MemberStatus.Starting);
return;
} else {
- member.setStatus(MemberStatus.Initialized);
- log.info("Member status updated to initialized");
+ member.setStatus(MemberStatus.Starting);
+ log.info("member started event adding status started");
TopologyManager.updateTopology(topology);
- //member intialized time
- Long timeStamp = System.currentTimeMillis();
-
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
+ //memberStartedEvent.
+
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
//publishing data
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- memberContext.getPartition().getUuid(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterInstanceId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- timeStamp, null, null, null);
+
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
+ instanceStartedEvent.getPartitionId(),
+ instanceStartedEvent.getNetworkPartitionId(),
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName(),
+ MemberStatus.Starting.toString(),
+ null);
}
} finally {
TopologyManager.releaseWriteLock();
}
+ } catch (Exception e) {
+ String message = String.format("Could not handle member started
event: [application-id] %s " +
+ "[service-name] %s [member-id] %s",
instanceStartedEvent.getApplicationId(),
+ instanceStartedEvent.getServiceName(),
instanceStartedEvent.getMemberId());
+ log.warn(message, e);
}
+ }
- private static int findKubernetesServicePort (String clusterId, List <
KubernetesService > kubernetesServices,
- PortMapping portMapping){
- for (KubernetesService kubernetesService : kubernetesServices) {
- if
(kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
- return kubernetesService.getPort();
- }
- }
- throw new RuntimeException("Kubernetes service port not found:
[cluster-id] " + clusterId + " [port] "
- + portMapping.getPort());
+ public static void handleMemberActivated(InstanceActivatedEvent
instanceActivatedEvent) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceActivatedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceActivatedEvent.getServiceName()));
+ return;
}
- public static void handleMemberStarted (InstanceStartedEvent
instanceStartedEvent){
- try {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceStartedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceStartedEvent.getServiceName()));
- return;
- }
- if
(!service.clusterExists(instanceStartedEvent.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in
service %s",
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName()));
- return;
- }
+ Cluster cluster =
service.getCluster(instanceActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceActivatedEvent.getClusterId()));
+ return;
+ }
- Cluster cluster =
service.getCluster(instanceStartedEvent.getClusterId());
- Member member =
cluster.getMember(instanceStartedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceStartedEvent.getMemberId()));
- return;
- }
+ Member member =
cluster.getMember(instanceActivatedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceActivatedEvent.getMemberId()));
+ return;
+ }
+
+ MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
+ instanceActivatedEvent.getServiceName(),
+ instanceActivatedEvent.getClusterId(),
+ instanceActivatedEvent.getClusterInstanceId(),
+ instanceActivatedEvent.getMemberId(),
+ instanceActivatedEvent.getNetworkPartitionId(),
+ instanceActivatedEvent.getPartitionId());
+
+ // grouping - set grouid
+ //TODO
+ memberActivatedEvent.setApplicationId(null);
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Active)) {
+ log.error("Invalid state transition from [" +
member.getStatus() + "] to [" + MemberStatus.Active + "]");
+ return;
+ } else {
+ member.setStatus(MemberStatus.Active);
+ // Set member ports
try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Starting))
{
- log.error("Invalid State Transition from " +
member.getStatus() + " to " +
- MemberStatus.Starting);
- return;
- } else {
- member.setStatus(MemberStatus.Starting);
- log.info("member started event adding status started");
+ Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(service.getServiceName());
+ if (cartridge == null) {
+ throw new RuntimeException(String.format("Cartridge
not found: [cartridge-type] %s",
+ service.getServiceName()));
+ }
- TopologyManager.updateTopology(topology);
- //member started time
- Long timeStamp = System.currentTimeMillis();
- //memberStartedEvent.
-
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
- //publishing data
-
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterInstanceId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- timeStamp, null, null, null);
+ Port port;
+ int portValue;
+ List<PortMapping> portMappings =
Arrays.asList(cartridge.getPortMappings());
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
+ List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
+
+ for (PortMapping portMapping : portMappings) {
+ if (kubernetesServices != null) {
+ portValue = findKubernetesServicePort(clusterId,
kubernetesServices, portMapping);
+ } else {
+ portValue = portMapping.getPort();
+ }
+ port = new Port(portMapping.getProtocol(), portValue,
portMapping.getProxyPort());
+ member.addPort(port);
+ memberActivatedEvent.addPort(port);
}
- } finally {
- TopologyManager.releaseWriteLock();
+ } catch (Exception e) {
+ String message = String.format("Could not add member
ports: [service-name] %s [member-id] %s",
+ memberActivatedEvent.getServiceName(),
memberActivatedEvent.getMemberId());
+ log.error(message, e);
}
- } catch (Exception e) {
- String message = String.format("Could not handle member
started event: [application-id] %s " +
- "[service-name] %s [member-id] %s",
instanceStartedEvent.getApplicationId(),
- instanceStartedEvent.getServiceName(),
instanceStartedEvent.getMemberId());
- log.warn(message, e);
+
+ // Set member ip addresses
+
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+ TopologyManager.updateTopology(topology);
+
+ // Publish member activated event
+
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+ // Publish statistics data
+
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
+ memberActivatedEvent.getPartitionId(),
+ memberActivatedEvent.getNetworkPartitionId(),
+ memberActivatedEvent.getClusterId(),
+ memberActivatedEvent.getServiceName(),
+ MemberStatus.Active.toString(),
+ null);
}
+ } finally {
+ TopologyManager.releaseWriteLock();
}
+ }
- public static void handleMemberActivated (InstanceActivatedEvent
instanceActivatedEvent){
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceActivatedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceActivatedEvent.getServiceName()));
- return;
- }
+ public static void
handleMemberReadyToShutdown(InstanceReadyToShutdownEvent
instanceReadyToShutdownEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceReadyToShutdownEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceReadyToShutdownEvent.getServiceName()));
+ return;
+ }
- Cluster cluster =
service.getCluster(instanceActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceActivatedEvent.getClusterId()));
- return;
- }
+ Cluster cluster =
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceReadyToShutdownEvent.getClusterId()));
+ return;
+ }
- Member member =
cluster.getMember(instanceActivatedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceActivatedEvent.getMemberId()));
+
+ Member member =
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceReadyToShutdownEvent.getMemberId()));
+ return;
+ }
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = new
MemberReadyToShutdownEvent(
+ instanceReadyToShutdownEvent.getServiceName(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getPartitionId());
+ try {
+ TopologyManager.acquireWriteLock();
+
+ if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+ log.error("Invalid State Transition from " +
member.getStatus() + " to " +
+ MemberStatus.ReadyToShutDown);
return;
}
+ member.setStatus(MemberStatus.ReadyToShutDown);
+ log.info("Member Ready to shut down event adding status started");
- MemberActivatedEvent memberActivatedEvent = new
MemberActivatedEvent(
- instanceActivatedEvent.getServiceName(),
- instanceActivatedEvent.getClusterId(),
- instanceActivatedEvent.getClusterInstanceId(),
- instanceActivatedEvent.getMemberId(),
- instanceActivatedEvent.getNetworkPartitionId(),
- instanceActivatedEvent.getPartitionId());
-
- // grouping - set grouid
- //TODO
- memberActivatedEvent.setApplicationId(null);
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Active)) {
- log.error("Invalid state transition from [" +
member.getStatus() + "] to [" +
- MemberStatus.Active + "]");
- return;
- } else {
- member.setStatus(MemberStatus.Active);
-
- // Set member ports
- try {
- Cartridge cartridge =
CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
- if (cartridge == null) {
- throw new
RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
- service.getServiceName()));
- }
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ //publishing data
+
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getPartitionId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getServiceName(),
+ MemberStatus.ReadyToShutDown.toString(),
+ null);
+ //termination of particular instance will be handled by autoscaler
+ }
- Port port;
- int portValue;
- List<PortMapping> portMappings =
Arrays.asList(cartridge.getPortMappings());
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
+ public static void handleMemberMaintenance(InstanceMaintenanceModeEvent
instanceMaintenanceModeEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(instanceMaintenanceModeEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceMaintenanceModeEvent.getServiceName()));
+ return;
+ }
- for (PortMapping portMapping : portMappings) {
- if (kubernetesServices != null) {
- portValue =
findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
- } else {
- portValue = portMapping.getPort();
- }
- port = new Port(portMapping.getProtocol(),
portValue, portMapping.getProxyPort());
- member.addPort(port);
- memberActivatedEvent.addPort(port);
- }
- } catch (Exception e) {
- String message = String.format("Could not add member
ports: [service-name] %s [member-id] %s",
- memberActivatedEvent.getServiceName(),
memberActivatedEvent.getMemberId());
- log.error(message, e);
- }
+ Cluster cluster =
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceMaintenanceModeEvent.getClusterId()));
+ return;
+ }
- // Set member ip addresses
-
memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
-
memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
-
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
-
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
- TopologyManager.updateTopology(topology);
- //member activated time
- Long timeStamp = System.currentTimeMillis();
- // Publish member activated event
-
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
- // Publish statistics data
-
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterInstanceId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Active.toString(),
- timeStamp, null, null, null);
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ Member member =
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceMaintenanceModeEvent.getMemberId()));
+ return;
}
- public static void handleMemberReadyToShutdown
(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceReadyToShutdownEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceReadyToShutdownEvent.getServiceName()));
- return;
- }
- Cluster cluster =
service.getCluster(instanceReadyToShutdownEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceReadyToShutdownEvent.getClusterId()));
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = new
MemberMaintenanceModeEvent(
+ instanceMaintenanceModeEvent.getServiceName(),
+ instanceMaintenanceModeEvent.getClusterId(),
+ instanceMaintenanceModeEvent.getClusterInstanceId(),
+ instanceMaintenanceModeEvent.getMemberId(),
+ instanceMaintenanceModeEvent.getNetworkPartitionId(),
+ instanceMaintenanceModeEvent.getPartitionId());
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+ log.error("Invalid State Transition from " +
member.getStatus() + " to "
+ + MemberStatus.In_Maintenance);
return;
}
+ member.setStatus(MemberStatus.In_Maintenance);
+ log.info("member maintenance mode event adding status started");
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ //publishing data
+
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
- Member member =
cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceReadyToShutdownEvent.getMemberId()));
- return;
- }
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = new
MemberReadyToShutdownEvent(
- instanceReadyToShutdownEvent.getServiceName(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getClusterInstanceId(),
- instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getPartitionId());
- //member ReadyToShutDown state change time
- Long timeStamp = null;
- try {
- TopologyManager.acquireWriteLock();
+ }
- if
(!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
- log.error("Invalid State Transition from " +
member.getStatus() + " to " +
- MemberStatus.ReadyToShutDown);
- return;
- }
- member.setStatus(MemberStatus.ReadyToShutDown);
- log.info("Member Ready to shut down event adding status
started");
+ /**
+ * Remove member from topology and send member terminated event.
+ *
+ * @param serviceName
+ * @param clusterId
+ * @param networkPartitionId
+ * @param partitionId
+ * @param memberId
+ */
+ public static void handleMemberTerminated(String serviceName, String
clusterId,
+ String networkPartitionId,
String partitionId,
+ String memberId) {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceName);
+ Properties properties;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ serviceName));
+ return;
+ }
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterId));
+ return;
+ }
- TopologyManager.updateTopology(topology);
- timeStamp = System.currentTimeMillis();
- } finally {
- TopologyManager.releaseWriteLock();
- }
-
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
-
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterInstanceId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- timeStamp, null, null, null);
- //termination of particular instance will be handled by autoscaler
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ memberId));
+ return;
}
- public static void handleMemberMaintenance
(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(instanceMaintenanceModeEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceMaintenanceModeEvent.getServiceName()));
- return;
- }
+ String clusterInstanceId = member.getClusterInstanceId();
- Cluster cluster =
service.getCluster(instanceMaintenanceModeEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceMaintenanceModeEvent.getClusterId()));
- return;
- }
+ try {
+ TopologyManager.acquireWriteLock();
+ properties = member.getProperties();
+ cluster.removeMember(member);
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ /* @TODO leftover from grouping_poc*/
+ String groupAlias = null;
+ TopologyEventPublisher.sendMemberTerminatedEvent(serviceName,
clusterId, memberId,
+ clusterInstanceId, networkPartitionId,
+ partitionId, properties, groupAlias);
+ }
- Member member =
cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceMaintenanceModeEvent.getMemberId()));
- return;
- }
+ public static void handleMemberSuspended() {
+ //TODO
+ try {
+ TopologyManager.acquireWriteLock();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
+ public static void
handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent
clusterStatusClusterActivatedEvent) {
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = new
MemberMaintenanceModeEvent(
- instanceMaintenanceModeEvent.getServiceName(),
- instanceMaintenanceModeEvent.getClusterId(),
- instanceMaintenanceModeEvent.getClusterInstanceId(),
- instanceMaintenanceModeEvent.getMemberId(),
- instanceMaintenanceModeEvent.getNetworkPartitionId(),
- instanceMaintenanceModeEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if
(!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
- log.error("Invalid State Transition from " +
member.getStatus() + " to "
- + MemberStatus.In_Maintenance);
- return;
- }
- member.setStatus(MemberStatus.In_Maintenance);
- log.info("member maintenance mode event adding status
started");
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ clusterStatusClusterActivatedEvent.getServiceName()));
+ return;
+ }
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //publishing data
-
TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
+ Cluster cluster =
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterStatusClusterActivatedEvent.getClusterId()));
+ return;
+ }
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
+ if (clusterContext == null) {
+ log.warn("Cluster context not found: [cluster-id] " + clusterId);
+ return;
}
- /**
- * Remove member from topology and send member terminated event.
- *
- * @param serviceName
- * @param clusterId
- * @param networkPartitionId
- * @param partitionId
- * @param memberId
- */
- public static void handleMemberTerminated (String serviceName, String
clusterId,
- String networkPartitionId, String partitionId,
- String memberId){
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Properties properties;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- serviceName));
- return;
- }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterId));
- return;
- }
+ ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+ new ClusterInstanceActivatedEvent(
+ clusterStatusClusterActivatedEvent.getAppId(),
+ clusterStatusClusterActivatedEvent.getServiceName(),
+ clusterStatusClusterActivatedEvent.getClusterId(),
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
- Member member = cluster.getMember(memberId);
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberId));
- return;
+ if (kubernetesServices != null) {
+
+ try {
+ // Generate access URLs for kubernetes services
+ for (KubernetesService kubernetesService :
kubernetesServices) {
+
+ if
(kubernetesService.getServiceType().equals(KubernetesConstants.NODE_PORT)) {
+ // Public IP = Kubernetes minion public IP
+ String[] publicIPs =
kubernetesService.getPublicIPs();
+ if ((publicIPs != null) && (publicIPs.length > 0))
{
+ for (String publicIP : publicIPs) {
+ // There can be a String array with null
values
+ if (publicIP != null) {
+ // Using type URI since only http,
https, ftp, file, jar protocols are supported in URL
+ URI accessURL = new
URI(kubernetesService.getProtocol(), null, publicIP,
+ kubernetesService.getPort(),
null, null, null);
+
cluster.addAccessUrl(accessURL.toString());
+
clusterInstanceActivatedEvent.addAccessUrl(accessURL.toString());
+ } else {
+ log.error(String.format("Could not
create access URL for [Kubernetes-service] %s , " +
+ "since Public IP is not
available", kubernetesService.getId()));
+ }
+ }
+ }
+ }
+ }
+ } catch (URISyntaxException e) {
+ log.error("Could not create access URLs for Kubernetes
services", e);
+ }
}
- String clusterInstanceId = member.getClusterInstanceId();
+ ClusterInstance context =
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- properties = member.getProperties();
- cluster.removeMember(member);
+ if (context == null) {
+ log.warn("Cluster instance context is not found for [cluster]
" +
+ clusterStatusClusterActivatedEvent.getClusterId() + "
[instance-id] " +
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Active;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster activated adding status started for " +
cluster.getClusterId());
TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
+ // publish event
+
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s
[status-requested] %s",
+ clusterStatusClusterActivatedEvent.getClusterId(),
clusterStatusClusterActivatedEvent.getInstanceId(),
+ context.getStatus(), status));
+ return;
}
- /* @TODO leftover from grouping_poc*/
- String groupAlias = null;
- TopologyEventPublisher.sendMemberTerminatedEvent(serviceName,
clusterId, memberId,
- clusterInstanceId, networkPartitionId,
- partitionId, properties, groupAlias);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- public static void handleMemberSuspended () {
- //TODO
- try {
- TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ }
- public static void handleClusterActivatedEvent
(ClusterStatusClusterActivatedEvent
- clusterStatusClusterActivatedEvent){
+ public static void handleClusterInactivateEvent(
+ ClusterStatusClusterInactivateEvent clusterInactivateEvent) {
+ Topology topology = TopologyManager.getTopology();
+ Service service =
topology.getService(clusterInactivateEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ clusterInactivateEvent.getServiceName()));
+ return;
+ }
- Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterStatusClusterActivatedEvent.getServiceName()));
- return;
- }
+ Cluster cluster =
service.getCluster(clusterInactivateEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterInactivateEvent.getClusterId()));
+ return;
+ }
- Cluster cluster =
service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterStatusClusterActivatedEvent.getClusterId()));
+ ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+ new ClusterInstanceInactivateEvent(
+ clusterInactivateEvent.getAppId(),
+ clusterInactivateEvent.getServiceName(),
+ clusterInactivateEvent.getClusterId(),
+ clusterInactivateEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ ClusterInstance context =
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster]
" +
+ clusterInactivateEvent.getClusterId() + "
[instance-id] " +
+ clusterInactivateEvent.getInstanceId());
return;
}
-
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext =
CloudControllerContext.getInstance().getClusterContext(clusterId);
- if (clusterContext == null) {
- log.warn("Cluster context not found: [cluster-id] " +
clusterId);
+ ClusterStatus status = ClusterStatus.Inactive;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Inactive adding status started for" +
cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+ } else {
+ log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s
[status-requested] %s",
+ clusterInactivateEvent.getClusterId(),
clusterInactivateEvent.getInstanceId(),
+ context.getStatus(), status));
return;
}
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ }
- ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
- new ClusterInstanceActivatedEvent(
- clusterStatusClusterActivatedEvent.getAppId(),
-
clusterStatusClusterActivatedEvent.getServiceName(),
- clusterStatusClusterActivatedEvent.getClusterId(),
-
clusterStatusClusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- List<KubernetesService> kubernetesServices =
clusterContext.getKubernetesServices();
- cluster.setKubernetesServices(kubernetesServices);
+ private static void
deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event)
{
+ try {
+ MetaDataServiceClient metadataClient = new
DefaultMetaDataServiceClient();
+ metadataClient.deleteApplicationProperties(event.getAppId());
+ } catch (Exception e) {
+ log.error("Error occurred while deleting the application resources
frm metadata service ", e);
+ }
+ }
- ClusterInstance context =
cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster instance context is not found for
[cluster] " +
- clusterStatusClusterActivatedEvent.getClusterId()
+ " [instance-id] " +
-
clusterStatusClusterActivatedEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Active;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster activated adding status started for " +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- // publish event
-
TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- clusterStatusClusterActivatedEvent.getClusterId(),
clusterStatusClusterActivatedEvent.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ public static void
handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
- }
+ TopologyManager.acquireWriteLock();
- public static void handleClusterInactivateEvent (
- ClusterStatusClusterInactivateEvent clusterInactivateEvent){
+ try {
Topology topology = TopologyManager.getTopology();
- Service service =
topology.getService(clusterInactivateEvent.getServiceName());
+ Service service = topology.getService(event.getServiceName());
+
//update the status of the cluster
if (service == null) {
log.warn(String.format("Service %s does not exist",
- clusterInactivateEvent.getServiceName()));
+ event.getServiceName()));
return;
}
- Cluster cluster =
service.getCluster(clusterInactivateEvent.getClusterId());
+ Cluster cluster = service.getCluster(event.getClusterId());
if (cluster == null) {
log.warn(String.format("Cluster %s does not exist",
- clusterInactivateEvent.getClusterId()));
+ event.getClusterId()));
return;
}
- ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
- new ClusterInstanceInactivateEvent(
- clusterInactivateEvent.getAppId(),
- clusterInactivateEvent.getServiceName(),
- clusterInactivateEvent.getClusterId(),
- clusterInactivateEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context =
cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for
[cluster] " +
- clusterInactivateEvent.getClusterId() + "
[instance-id] " +
- clusterInactivateEvent.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Inactive;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Inactive adding status started for" +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
-
TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- clusterInactivateEvent.getClusterId(),
clusterInactivateEvent.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster]
" +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
}
- }
-
+ ClusterStatus status = ClusterStatus.Terminated;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminated adding status started for and
removing the cluster instance"
+ + cluster.getClusterId());
+ cluster.removeInstanceContext(event.getInstanceId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatedEvent clusterTerminatedEvent = new
ClusterInstanceTerminatedEvent(event.getAppId(),
+ event.getServiceName(), event.getClusterId(),
event.getInstanceId());
- private static void deleteAppResourcesFromMetadataService
(ApplicationInstanceTerminatedEvent event){
- try {
- MetaDataServiceClient metadataClient = new
DefaultMetaDataServiceClient();
- metadataClient.deleteApplicationProperties(event.getAppId());
- } catch (Exception e) {
- log.error("Error occurred while deleting the application
resources frm metadata service ", e);
+
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s
[status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ return;
}
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- public static void handleClusterTerminatedEvent
(ClusterStatusClusterTerminatedEvent event){
-
- TopologyManager.acquireWriteLock();
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
+ }
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- event.getServiceName()));
- return;
- }
+ public static void
handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- event.getClusterId()));
- return;
- }
+ TopologyManager.acquireWriteLock();
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for
[cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminated;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminated adding status started for and
removing the cluster instance"
- + cluster.getClusterId());
- cluster.removeInstanceContext(event.getInstanceId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatedEvent clusterTerminatedEvent =
new ClusterInstanceTerminatedEvent(
- event.getAppId(), event.getServiceName(),
event.getClusterId(), event.getInstanceId());
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Cluster cluster = topology.getService(event.getServiceName()).
+ getCluster(event.getClusterId());
-
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- return;
- }
- } finally {
- TopologyManager.releaseWriteLock();
+ if (!cluster.isStateTransitionValid(ClusterStatus.Terminating,
event.getInstanceId())) {
+ log.error("Invalid state transfer from " +
cluster.getStatus(event.getInstanceId()) + " to " +
+ ClusterStatus.Terminating);
}
+ ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster]
" +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminating;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminating started for " +
cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatingEvent clusterTerminaingEvent = new
ClusterInstanceTerminatingEvent(event.getAppId(),
+ event.getServiceName(), event.getClusterId(),
event.getInstanceId());
+
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
- }
-
- public static void handleClusterTerminatingEvent
(ClusterStatusClusterTerminatingEvent event){
-
- TopologyManager.acquireWriteLock();
-
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
-
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminating,
event.getInstanceId())) {
- log.error("Invalid state transfer from " +
cluster.getStatus(event.getInstanceId()) + " to " +
- ClusterStatus.Terminating);
- }
- ClusterInstance context =
cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for
[cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Terminating;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminating started for " +
cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatingEvent clusterTerminaingEvent =
new ClusterInstanceTerminatingEvent(
- event.getAppId(), event.getServiceName(),
event.getClusterId(), event.getInstanceId());
-
-
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
-
- // Remove kubernetes services if available
- ClusterContext clusterContext =
-
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
- if
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
-
KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
- }
- } else {
- log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s
[status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
+ // Remove kubernetes services if available
+ ClusterContext clusterContext =
+
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+ if
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+ KubernetesIaas.removeKubernetesServices(event.getAppId(),
event.getClusterId());
}
- } finally {
- TopologyManager.releaseWriteLock();
+ } else {
+ log.error(String.format("Cluster state transition is not
valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s
[status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
}
+ } finally {
+ TopologyManager.releaseWriteLock();
}
}
+}