Fix STRATOS-1578: Handle registry exceptions in higher levels to avoid inconsistencies in Cloud Controller context and Topology context
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8d46fab0 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8d46fab0 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8d46fab0 Branch: refs/heads/stratos-4.1.x Commit: 8d46fab0f921e37d5f6da11f8006b68b3743226d Parents: 0fe27c7 Author: Akila Perera <[email protected]> Authored: Mon Oct 12 00:57:23 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Mon Oct 12 00:57:23 2015 +0530 ---------------------------------------------------------------------- .../context/CloudControllerContext.java | 8 +- .../stratos/cloud/controller/iaases/Iaas.java | 9 +- .../cloud/controller/iaases/JcloudsIaas.java | 107 ++-- .../iaases/kubernetes/KubernetesIaas.java | 323 +++++------ .../application/ApplicationEventReceiver.java | 23 +- .../status/ClusterStatusTopicReceiver.java | 31 +- .../status/InstanceStatusTopicReceiver.java | 11 +- .../messaging/topology/TopologyBuilder.java | 579 +++++++------------ .../messaging/topology/TopologyManager.java | 12 +- .../impl/CloudControllerServiceImpl.java | 222 +++---- .../impl/CloudControllerServiceUtil.java | 6 +- .../services/impl/InstanceCreator.java | 9 +- .../controller/util/CloudControllerUtil.java | 8 - 13 files changed, 611 insertions(+), 737 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java index f00a9f9..b935ffc 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java @@ -689,13 +689,9 @@ public class CloudControllerContext implements Serializable { this.coordinator = coordinator; } - public void persist() { + public void persist() throws RegistryException { if ((!isClustered()) || (isCoordinator())) { - try { - RegistryManager.getInstance().persist(CloudControllerConstants.DATA_RESOURCE, this); - } catch (RegistryException e) { - log.error("Could not persist cloud controller context in registry", e); - } + RegistryManager.getInstance().persist(CloudControllerConstants.DATA_RESOURCE, this); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/Iaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/Iaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/Iaas.java index 543f251..b4a1133 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/Iaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/Iaas.java @@ -61,7 +61,8 @@ public abstract class Iaas { * @param payload * @return updated memberContext */ - public abstract MemberContext startInstance(MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException; + public abstract MemberContext startInstance(MemberContext memberContext, byte[] payload) + throws CartridgeNotFoundException; /** * This will deallocate/release the given IP address back to pool. @@ -117,7 +118,6 @@ public abstract class Iaas { */ public abstract String createVolume(int sizeGB, String snapshotId); - /** * Attach a given volume to an instance at the specified device path. * @@ -170,7 +170,8 @@ public abstract class Iaas { * @throws InvalidCartridgeTypeException * @throws InvalidMemberException */ - public abstract void terminateInstance(MemberContext memberContext) throws InvalidCartridgeTypeException, InvalidMemberException, MemberTerminationFailedException; + public abstract void terminateInstance(MemberContext memberContext) + throws InvalidCartridgeTypeException, InvalidMemberException, MemberTerminationFailedException; /** * Get the group name which will be used when creating a node via jclouds API @@ -178,7 +179,7 @@ public abstract class Iaas { * @param memberContext * @param payload */ - public String getGroupName(MemberContext memberContext, byte[] payload){ + public String getGroupName(MemberContext memberContext, byte[] payload) { String clusterId = memberContext.getClusterId(); String str = clusterId.length() > 10 ? clusterId.substring(0, 10) : clusterId.substring(0, clusterId.length()); String group = str.replaceAll("[^a-z0-9-]", ""); http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/JcloudsIaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/JcloudsIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/JcloudsIaas.java index 562ae81..55ccebe 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/JcloudsIaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/JcloudsIaas.java @@ -35,6 +35,7 @@ import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.NodeMetadataBuilder; import org.jclouds.compute.domain.Template; import org.jclouds.rest.ResourceNotFoundException; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.ArrayList; import java.util.List; @@ -52,9 +53,12 @@ public abstract class JcloudsIaas extends Iaas { } /** - * This should build the {@link org.jclouds.compute.ComputeService} object and the {@link org.jclouds.compute.domain.Template} object, - * using the information from {@link org.apache.stratos.cloud.controller.domain.IaasProvider} and should set the built - * {@link org.jclouds.compute.ComputeService} object in the {@link org.apache.stratos.cloud.controller.domain.IaasProvider#setComputeService(org.jclouds.compute.ComputeService)} + * This should build the {@link org.jclouds.compute.ComputeService} object and the {@link org.jclouds.compute + * .domain.Template} object, + * using the information from {@link org.apache.stratos.cloud.controller.domain.IaasProvider} and should set the + * built + * {@link org.jclouds.compute.ComputeService} object in the {@link org.apache.stratos.cloud.controller.domain + * .IaasProvider#setComputeService(org.jclouds.compute.ComputeService)} * and also should set the built {@link org.jclouds.compute.domain.Template} object in the * {@link org.apache.stratos.cloud.controller.domain.IaasProvider#setTemplate(org.jclouds.compute.domain.Template)}. */ @@ -66,7 +70,8 @@ public abstract class JcloudsIaas extends Iaas { public abstract void buildTemplate(); /** - * This method should create a Key Pair corresponds to a given public key in the respective region having the name given. + * This method should create a Key Pair corresponds to a given public key in the respective region having the + * name given. * Also should override the value of the key pair in the {@link org.jclouds.compute.domain.Template} of this IaaS. * * @param region region that the key pair will get created. @@ -85,7 +90,8 @@ public abstract class JcloudsIaas extends Iaas { public abstract List<String> associateAddresses(NodeMetadata node); /** - * This will obtain a predefined IP address and associate that IP with this node, if ip is already in use allocate ip from pool + * This will obtain a predefined IP address and associate that IP with this node, if ip is already in use + * allocate ip from pool * (through associateAddress()) * * @param node Node to be associated with an IP. @@ -117,27 +123,27 @@ public abstract class JcloudsIaas extends Iaas { Template template = getIaasProvider().getTemplate(); if (template == null) { - String msg = "Could not start an instance, jclouds template is null for iaas provider [type]: " + - getIaasProvider().getType(); + String msg = "Could not start an instance, jclouds template is null for iaas provider [type]: " + + getIaasProvider().getType(); log.error(msg); throw new InvalidIaasProviderException(msg); } if (log.isDebugEnabled()) { - log.debug("Cloud controller is delegating request to start an instance for " - + memberContext + " to jclouds"); + log.debug("Cloud controller is delegating request to start an instance for " + memberContext + + " to jclouds"); } // create and start a node Set<? extends NodeMetadata> nodeMetadataSet = computeService.createNodesInGroup(group, 1, template); NodeMetadata nodeMetadata = nodeMetadataSet.iterator().next(); if (log.isDebugEnabled()) { - log.debug("Cloud controller received a response for the request to start " - + memberContext + " from Jclouds layer."); + log.debug("Cloud controller received a response for the request to start " + memberContext + + " from Jclouds layer."); } if (nodeMetadata == null) { - String msg = "Null response received for instance start-up request to Jclouds.\n" - + memberContext.toString(); + String msg = "Null response received for instance start-up request to Jclouds.\n" + memberContext + .toString(); log.error(msg); throw new IllegalStateException(msg); } @@ -194,17 +200,15 @@ public abstract class JcloudsIaas extends Iaas { if (StringUtils.isNotBlank(preDefinedPublicIp)) { // Allocate predefined public ip if (log.isDebugEnabled()) { - log.debug(String.format("Allocating predefined public IP address: " + - "[cartridge-type] %s [member-id] %s [pre-defined-ip] %s", - memberContext.getCartridgeType(), memberContext.getMemberId(), - preDefinedPublicIp)); + log.debug(String.format("Allocating predefined public IP address: " + + "[cartridge-type] %s [member-id] %s [pre-defined-ip] %s", + memberContext.getCartridgeType(), memberContext.getMemberId(), preDefinedPublicIp)); } if (!CloudControllerServiceUtil.isValidIpAddress(preDefinedPublicIp)) { - String msg = String.format("Predefined public IP address is not valid: " + - "[cartridge-type] %s [member-id] %s [pre-defined-ip] %s", - memberContext.getCartridgeType(), memberContext.getMemberId(), - preDefinedPublicIp); + String msg = String.format("Predefined public IP address is not valid: " + + "[cartridge-type] %s [member-id] %s [pre-defined-ip] %s", + memberContext.getCartridgeType(), memberContext.getMemberId(), preDefinedPublicIp); log.error(msg); throw new CloudControllerException(msg); } @@ -213,9 +217,8 @@ public abstract class JcloudsIaas extends Iaas { if ((StringUtils.isBlank(allocatedIp)) || (!preDefinedPublicIp.equals(allocatedIp))) { String msg = String.format("Could not allocate predefined public IP address: " + "[cartridge-type] %s [member-id] %s " + - "[pre-defined-ip] %s [allocated-ip] %s", - memberContext.getCartridgeType(), memberContext.getMemberId(), - preDefinedPublicIp, allocatedIp); + "[pre-defined-ip] %s [allocated-ip] %s", memberContext.getCartridgeType(), + memberContext.getMemberId(), preDefinedPublicIp, allocatedIp); log.error(msg); throw new CloudControllerException(msg); } @@ -223,8 +226,8 @@ public abstract class JcloudsIaas extends Iaas { } else { // Allocate dynamic public ip addresses if (log.isDebugEnabled()) { - log.debug(String.format("Allocating dynamic public IP addresses: " + - "[cartridge-type] %s [member-id] %s", + log.debug(String.format( + "Allocating dynamic public IP addresses: " + "[cartridge-type] %s [member-id] %s", memberContext.getCartridgeType(), memberContext.getMemberId())); } @@ -233,25 +236,24 @@ public abstract class JcloudsIaas extends Iaas { // checking for null and empty is enough. If there are elements in this list, they are valid IPs // because we are validating before putting into the list if (associatedIPs == null || associatedIPs.isEmpty()) { - String msg = String.format("Could not allocate dynamic public IP addresses: " + - "[cartridge-type] %s [member-id] %s", - memberContext.getCartridgeType(), memberContext.getMemberId(), - preDefinedPublicIp); + String msg = String.format("Could not allocate dynamic public IP addresses: " + + "[cartridge-type] %s [member-id] %s", memberContext.getCartridgeType(), + memberContext.getMemberId(), preDefinedPublicIp); log.error(msg); throw new CloudControllerException(msg); } } memberContext.setAllocatedIPs(associatedIPs.toArray(new String[associatedIPs.size()])); - log.info(String.format("IP addresses allocated to member: [cartridge-type] %s [member-id] %s " + - "[allocated-ip-addresses] %s ", memberContext.getCartridgeType(), + log.info(String.format("IP addresses allocated to member: [cartridge-type] %s [member-id] %s " + + "[allocated-ip-addresses] %s ", memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getAllocatedIPs())); // build the node with the new ip - nodeMetadata = NodeMetadataBuilder.fromNodeMetadata(nodeMetadata).publicAddresses(associatedIPs).build(); + nodeMetadata = NodeMetadataBuilder.fromNodeMetadata(nodeMetadata).publicAddresses(associatedIPs) + .build(); } - // public IPs Set<String> publicIPAddresses = nodeMetadata.getPublicAddresses(); if (publicIPAddresses != null && !publicIPAddresses.isEmpty()) { @@ -278,42 +280,44 @@ public abstract class JcloudsIaas extends Iaas { log.debug("IP allocation process ended for " + memberContext); } } catch (Exception e) { - String msg = String.format("Error occurred while allocating ip addresses: [cartridge-type] %s " + - "[member-id] %s", memberContext.getCartridgeType(), memberContext.getMemberId()); + String msg = String + .format("Error occurred while allocating ip addresses: [cartridge-type] %s " + "[member-id] %s", + memberContext.getCartridgeType(), memberContext.getMemberId()); log.error(msg, e); throw new CloudControllerException(msg, e); } } - public void terminateInstance(MemberContext memberContext) throws InvalidCartridgeTypeException, - InvalidMemberException { + public void terminateInstance(MemberContext memberContext) + throws InvalidCartridgeTypeException, InvalidMemberException { String memberId = memberContext.getMemberId(); String cartridgeType = memberContext.getCartridgeType(); String nodeId = memberContext.getInstanceId(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); if (log.isInfoEnabled()) { - log.info(String.format("Starting to terminate member: [cartridge-type] %s [member-id] %s", - cartridgeType, memberId)); + log.info(String.format("Starting to terminate member: [cartridge-type] %s [member-id] %s", cartridgeType, + memberId)); } if (cartridge == null) { - String msg = String.format("Member termination failed, could not find cartridge in cloud controller " + - "context: [cartridge-type] %s [member-id] %s", - cartridgeType, memberId); + String msg = String.format("Member termination failed, could not find cartridge in cloud controller " + + "context: [cartridge-type] %s [member-id] %s", cartridgeType, memberId); log.error(msg); throw new InvalidCartridgeTypeException(msg); } // if no matching node id can be found. if (nodeId == null) { - String msg = String.format("Member termination failed, could not find node id in member context: " + - "[cartridge-type] %s [member-id] %s", - cartridgeType, memberId); - - // Execute member termination post process - CloudControllerServiceUtil.executeMemberTerminationPostProcess(memberContext); + String msg = String.format("Member termination failed, could not find node id in member context: " + + "[cartridge-type] %s [member-id] %s", cartridgeType, memberId); log.error(msg); + // Execute member termination post process + try { + CloudControllerServiceUtil.executeMemberTerminationPostProcess(memberContext); + } catch (RegistryException e) { + log.error("Could not persist data in registry data store", e); + } throw new InvalidMemberException(msg); } @@ -352,7 +356,9 @@ public abstract class JcloudsIaas extends Iaas { String clusterId = ctxt.getClusterId(); ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); if (clusterContext == null) { - log.error(String.format("Could not detach volume, Cluster context not found for the [member] %s [cluster-id]", ctxt.getMemberId(), clusterId)); + log.error(String.format( + "Could not detach volume, Cluster context not found for the [member] %s [cluster-id] %s", + ctxt.getMemberId(), clusterId)); return; } @@ -376,7 +382,6 @@ public abstract class JcloudsIaas extends Iaas { } } - public NodeMetadata findNodeMetadata(String nodeId) { ComputeService computeService = getIaasProvider().getComputeService(); return computeService.getNodeMetadata(nodeId); http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java index d330dde..4048390 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java @@ -44,6 +44,7 @@ import org.apache.stratos.kubernetes.client.KubernetesApiClient; import org.apache.stratos.kubernetes.client.KubernetesConstants; import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException; import org.apache.stratos.messaging.domain.topology.KubernetesService; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.*; import java.util.concurrent.locks.Lock; @@ -132,9 +133,14 @@ public class KubernetesIaas extends Iaas { } @Override - public void terminateInstance(MemberContext memberContext) throws InvalidCartridgeTypeException, - InvalidMemberException, MemberTerminationFailedException { - terminateContainer(memberContext); + public void terminateInstance(MemberContext memberContext) + throws InvalidCartridgeTypeException, InvalidMemberException, MemberTerminationFailedException { + try { + terminateContainer(memberContext); + } catch (RegistryException e) { + log.error(String.format("Could not persist data while terminating container for member [member-id] %s", + memberContext.getMemberId()), e); + } } /** @@ -144,16 +150,14 @@ public class KubernetesIaas extends Iaas { * @return * @throws CartridgeNotFoundException */ - public MemberContext startContainer(MemberContext memberContext) - throws CartridgeNotFoundException { + public MemberContext startContainer(MemberContext memberContext) throws CartridgeNotFoundException { Lock lock = null; try { lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); handleNullObject(memberContext, "member context is null"); log.info(String.format("Starting container: [application] %s [cartridge] %s [member] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId())); + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId())); // Validate cluster id String clusterId = memberContext.getClusterId(); @@ -163,16 +167,14 @@ public class KubernetesIaas extends Iaas { // Validate cluster context ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId); handleNullObject(clusterContext, - String.format("Cluster context not found: [application] %s [cartridge] %s " + - "[cluster] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(), - clusterId)); + String.format("Cluster context not found: [application] %s [cartridge] %s " + "[cluster] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), clusterId)); // Validate partition Partition partition = memberContext.getPartition(); - handleNullObject(partition, String.format("partition not found in member context: [application] %s " + - "[cartridge] %s [member] %s", memberContext.getApplicationId(), - memberContext.getCartridgeType(), - memberContext.getMemberId())); + handleNullObject(partition, String.format( + "partition not found in member context: [application] %s " + "[cartridge] %s [member] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId())); // Validate cartridge String cartridgeType = clusterContext.getCartridgeType(); @@ -195,13 +197,13 @@ public class KubernetesIaas extends Iaas { // Prepare kubernetes context String kubernetesMasterIp = kubernetesCluster.getKubernetesMaster().getPrivateIPAddress(); PortRange kubernetesPortRange = kubernetesCluster.getPortRange(); - String kubernetesMasterPort = CloudControllerUtil.getProperty( - kubernetesCluster.getKubernetesMaster().getProperties(), StratosConstants.KUBERNETES_MASTER_PORT, - StratosConstants.KUBERNETES_MASTER_DEFAULT_PORT); + String kubernetesMasterPort = CloudControllerUtil + .getProperty(kubernetesCluster.getKubernetesMaster().getProperties(), + StratosConstants.KUBERNETES_MASTER_PORT, StratosConstants.KUBERNETES_MASTER_DEFAULT_PORT); // Add kubernetes cluster payload parameters to payload - if ((kubernetesCluster.getProperties() != null) && - (kubernetesCluster.getProperties().getProperties() != null)) { + if ((kubernetesCluster.getProperties() != null) && (kubernetesCluster.getProperties().getProperties() + != null)) { for (Property property : kubernetesCluster.getProperties().getProperties()) { if (property != null) { if (property.getName().startsWith(PAYLOAD_PARAMETER_PREFIX)) { @@ -234,21 +236,17 @@ public class KubernetesIaas extends Iaas { // Update member context updateMemberContext(memberContext, pod, kubernetesCluster); - log.info(String.format("Container started successfully: [application] %s [cartridge] %s [member] %s " + - "[pod] %s [cpu] %s [memory] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId(), memberContext.getKubernetesPodId(), + log.info(String.format("Container started successfully: [application] %s [cartridge] %s [member] %s " + + "[pod] %s [cpu] %s [memory] %s", memberContext.getApplicationId(), + memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getKubernetesPodId(), memberContext.getInstanceMetadata().getCpu(), memberContext.getInstanceMetadata().getRam())); return memberContext; - } - catch (Exception e) { + } catch (Exception e) { String msg = String.format("Could not start container: [application] %s [cartridge] %s [member] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId()); + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId()); log.error(msg, e); throw new RuntimeException(msg, e); - } - finally { + } finally { if (lock != null) { CloudControllerContext.getInstance().releaseWriteLock(lock); } @@ -265,16 +263,16 @@ public class KubernetesIaas extends Iaas { if (StringUtils.isNotBlank(kubernetesHostPublicIP)) { memberPublicIPAddress = kubernetesHostPublicIP; if (log.isInfoEnabled()) { - log.info(String.format("Member public IP address set to kubernetes host public IP address:" + - "[pod-host-ip] %s [kubernetes-host-public-ip] %s", podHostIPAddress, kubernetesHostPublicIP)); + log.info(String.format("Member public IP address set to kubernetes host public IP address:" + + "[pod-host-ip] %s [kubernetes-host-public-ip] %s", podHostIPAddress, kubernetesHostPublicIP)); } } memberContext.setInstanceId(pod.getMetadata().getName()); memberContext.setDefaultPrivateIP(memberPrivateIPAddress); - memberContext.setPrivateIPs(new String[]{memberPrivateIPAddress}); + memberContext.setPrivateIPs(new String[] { memberPrivateIPAddress }); memberContext.setDefaultPublicIP(memberPublicIPAddress); - memberContext.setPublicIPs(new String[]{memberPublicIPAddress}); + memberContext.setPublicIPs(new String[] { memberPublicIPAddress }); memberContext.setInitTime(memberContext.getInitTime()); memberContext.setProperties(memberContext.getProperties()); } @@ -306,20 +304,20 @@ public class KubernetesIaas extends Iaas { podCreated = true; if (pod.getStatus().getPhase().equals(KubernetesConstants.POD_STATUS_RUNNING)) { log.info(String.format( - "Pod status changed to running: [application] %s [cartridge] %s [member] %s " + - "[pod] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(), + "Pod status changed to running: [application] %s [cartridge] %s [member] %s " + "[pod] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), pod.getMetadata().getName())); return pod; } else { - log.info(String.format("Waiting pod status to be changed to running: [application] %s " + - "[cartridge] %s [member] %s [pod] %s", memberContext.getApplicationId(), + log.info(String.format("Waiting pod status to be changed to running: [application] %s " + + "[cartridge] %s [member] %s [pod] %s", memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), pod.getMetadata().getName())); } } else { - log.info(String.format("Waiting for pod to be created: [application] %s " + - "[cartridge] %s [member] %s [pod] %s", memberContext.getApplicationId(), - memberContext.getCartridgeType(), memberContext.getMemberId(), + log.info(String.format( + "Waiting for pod to be created: [application] %s " + "[cartridge] %s [member] %s [pod] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getKubernetesPodId())); } @@ -332,19 +330,17 @@ public class KubernetesIaas extends Iaas { String message; if (podCreated) { // Pod created but status did not change to running - message = String.format("Pod status did not change to running within %d sec: " + - "[application] %s [cartridge] %s [member] %s [pod] %s", - (podActivationTimeout.intValue() / 1000), - memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), - memberContext.getKubernetesPodId()); + message = String.format("Pod status did not change to running within %d sec: " + + "[application] %s [cartridge] %s [member] %s [pod] %s", + (podActivationTimeout.intValue() / 1000), memberContext.getApplicationId(), + memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getKubernetesPodId()); log.error(message); } else { // Pod did not create - message = String.format("Pod did not create within %d sec: " + - "[application] %s [cartridge] %s [member] %s [pod] %s", - (podActivationTimeout.intValue() / 1000), - memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), - memberContext.getKubernetesPodId()); + message = String.format("Pod did not create within %d sec: " + + "[application] %s [cartridge] %s [member] %s [pod] %s", + (podActivationTimeout.intValue() / 1000), memberContext.getApplicationId(), + memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getKubernetesPodId()); log.error(message); } @@ -360,8 +356,8 @@ public class KubernetesIaas extends Iaas { * @throws KubernetesClientException */ private void createPod(ClusterContext clusterContext, MemberContext memberContext, - KubernetesApiClient kubernetesApi, KubernetesClusterContext kubernetesClusterContext) - throws KubernetesClientException { + KubernetesApiClient kubernetesApi, KubernetesClusterContext kubernetesClusterContext) + throws KubernetesClientException, RegistryException { String applicationId = memberContext.getApplicationId(); String cartridgeType = memberContext.getCartridgeType(); @@ -369,15 +365,16 @@ public class KubernetesIaas extends Iaas { String memberId = memberContext.getMemberId(); if (log.isInfoEnabled()) { - log.info(String.format("Creating kubernetes pod: [application] %s [cartridge] %s [member] %s", - applicationId, cartridgeType, memberId)); + log.info( + String.format("Creating kubernetes pod: [application] %s [cartridge] %s [member] %s", applicationId, + cartridgeType, memberId)); } Partition partition = memberContext.getPartition(); if (partition == null) { - String message = String.format("Partition not found in member context: [application] %s [cartridge] %s " + - "[member] %s ", applicationId, cartridgeType, - memberId); + String message = String + .format("Partition not found in member context: [application] %s [cartridge] %s " + "[member] %s ", + applicationId, cartridgeType, memberId); log.error(message); throw new RuntimeException(message); } @@ -401,8 +398,8 @@ public class KubernetesIaas extends Iaas { memory = memoryProperty.getValue(); } - IaasProvider iaasProvider = - CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridge.getType(), partition.getId()); + IaasProvider iaasProvider = CloudControllerContext.getInstance() + .getIaasProviderOfPartition(cartridge.getType(), partition.getId()); if (iaasProvider == null) { String message = "Could not find iaas provider: [partition] " + partition.getId(); log.error(message); @@ -423,15 +420,14 @@ public class KubernetesIaas extends Iaas { // Create pod String podName = DigestUtils.md5Hex(clusterId); String dockerImage = iaasProvider.getImage(); - List<EnvVar> environmentVariables = KubernetesIaasUtil.prepareEnvironmentVariables( - clusterContext, memberContext); + List<EnvVar> environmentVariables = KubernetesIaasUtil + .prepareEnvironmentVariables(clusterContext, memberContext); List<ContainerPort> ports = KubernetesIaasUtil.convertPortMappings(Arrays.asList(cartridge.getPortMappings())); - log.info(String.format("Starting pod: [application] %s [cartridge] %s [member] %s " + - "[cpu] %s [memory] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId(), cpu, memory)); + log.info(String.format("Starting pod: [application] %s [cartridge] %s [member] %s " + "[cpu] %s [memory] %s", + memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), cpu, + memory)); Map<String, String> podLabels = new HashMap<>(); podLabels.put(KubernetesConstants.SERVICE_SELECTOR_LABEL, podName); @@ -455,10 +451,9 @@ public class KubernetesIaas extends Iaas { kubernetesApi.createPod(podId, podName, podLabels, podAnnotations, dockerImage, cpu, memory, ports, environmentVariables); - log.info(String.format("Pod started successfully: [application] %s [cartridge] %s [member] %s " + - "[pod] %s [pod-label] %s [cpu] %s [memory] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), - memberContext.getMemberId(), podId, podName, cpu, memory)); + log.info(String.format("Pod started successfully: [application] %s [cartridge] %s [member] %s " + + "[pod] %s [pod-label] %s [cpu] %s [memory] %s", memberContext.getApplicationId(), + memberContext.getCartridgeType(), memberContext.getMemberId(), podId, podName, cpu, memory)); // Add pod id to member context memberContext.setKubernetesPodId(podId); @@ -489,15 +484,13 @@ public class KubernetesIaas extends Iaas { * @throws KubernetesClientException */ private void createKubernetesServices(KubernetesApiClient kubernetesApi, ClusterContext clusterContext, - KubernetesCluster kubernetesCluster, KubernetesClusterContext - kubernetesClusterContext, MemberContext memberContext) - throws KubernetesClientException { + KubernetesCluster kubernetesCluster, KubernetesClusterContext kubernetesClusterContext, + MemberContext memberContext) throws KubernetesClientException, RegistryException { String clusterId = clusterContext.getClusterId(); String cartridgeType = clusterContext.getCartridgeType(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); if (cartridge == null) { - String message = "Could not create kubernetes services, cartridge not found: [cartridge] " + - cartridgeType; + String message = "Could not create kubernetes services, cartridge not found: [cartridge] " + cartridgeType; log.error(message); throw new RuntimeException(message); } @@ -523,27 +516,25 @@ public class KubernetesIaas extends Iaas { } String serviceName = DigestUtils.md5Hex(clusterId); - Collection<KubernetesService> kubernetesServices = - clusterContext.getKubernetesServices(memberContext.getClusterInstanceId()); + Collection<KubernetesService> kubernetesServices = clusterContext + .getKubernetesServices(memberContext.getClusterInstanceId()); for (ClusterPortMapping clusterPortMapping : clusterPortMappings) { // Skip if already created int containerPort = clusterPortMapping.getPort(); KubernetesService existingService = findKubernetesService(kubernetesServices, containerPort); - if ((existingService != null) && serviceExistsInCluster( - existingService.getId(), kubernetesClusterContext, + if ((existingService != null) && serviceExistsInCluster(existingService.getId(), kubernetesClusterContext, memberContext, clusterPortMapping.getName())) { - log.info(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " + - "[cluster] %s [service-name] %s [container-port] %d ", + log.info(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " + + "[cluster] %s [service-name] %s [container-port] %d ", kubernetesCluster.getClusterId(), clusterId, serviceName, containerPort)); continue; } // Find next available service sequence number long serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo(); - String serviceId = - KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo)); + String serviceId = KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo)); while (kubernetesApi.getService(serviceId) != null) { serviceSeqNo = kubernetesClusterContext.getNextServiceSeqNo(); serviceId = KubernetesIaasUtil.fixSpecialCharacters(prepareServiceName(serviceSeqNo)); @@ -551,9 +542,9 @@ public class KubernetesIaas extends Iaas { if (log.isInfoEnabled()) { log.info(String.format("Creating kubernetes service: [cluster] %s [service-id] %s [service-name] " + - "%s " + "[protocol] %s [service-port] %d [container-port] %s", clusterId, - serviceId, serviceName, clusterPortMapping.getProtocol(), - clusterPortMapping.getKubernetesServicePort(), containerPort)); + "%s " + "[protocol] %s [service-port] %d [container-port] %s", clusterId, serviceId, + serviceName, clusterPortMapping.getProtocol(), clusterPortMapping.getKubernetesServicePort(), + containerPort)); } // Create kubernetes service for port mapping @@ -573,28 +564,26 @@ public class KubernetesIaas extends Iaas { trimLabel(CloudControllerConstants.PORT_NAME_LABEL, clusterPortMapping.getName())); Map<String, String> serviceAnnotations = new HashMap<>(); - serviceAnnotations - .put(CloudControllerConstants.APPLICATION_ID_LABEL, clusterContext.getApplicationId()); + serviceAnnotations.put(CloudControllerConstants.APPLICATION_ID_LABEL, clusterContext.getApplicationId()); serviceAnnotations.put(CloudControllerConstants.CLUSTER_ID_LABEL, clusterContext.getClusterId()); - serviceAnnotations.put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL, - memberContext.getClusterInstanceId()); + serviceAnnotations + .put(CloudControllerConstants.CLUSTER_INSTANCE_ID_LABEL, memberContext.getClusterInstanceId()); serviceAnnotations.put(CloudControllerConstants.PORT_NAME_LABEL, clusterPortMapping.getName()); serviceAnnotations.put(CloudControllerConstants.PROTOCOL_LABEL, clusterPortMapping.getProtocol()); - serviceAnnotations.put(CloudControllerConstants.PORT_TYPE_LABEL, - clusterPortMapping.getKubernetesPortType()); - serviceAnnotations.put(CloudControllerConstants.SERVICE_PORT_LABEL, String.valueOf(clusterPortMapping - .getKubernetesServicePort())); serviceAnnotations - .put(CloudControllerConstants.PORT_LABEL, String.valueOf(clusterPortMapping.getPort())); - serviceAnnotations.put(CloudControllerConstants.PROXY_PORT_LABEL, - String.valueOf(clusterPortMapping.getProxyPort())); + .put(CloudControllerConstants.PORT_TYPE_LABEL, clusterPortMapping.getKubernetesPortType()); + serviceAnnotations.put(CloudControllerConstants.SERVICE_PORT_LABEL, + String.valueOf(clusterPortMapping.getKubernetesServicePort())); + serviceAnnotations.put(CloudControllerConstants.PORT_LABEL, String.valueOf(clusterPortMapping.getPort())); + serviceAnnotations + .put(CloudControllerConstants.PROXY_PORT_LABEL, String.valueOf(clusterPortMapping.getProxyPort())); - kubernetesApi.createService(serviceId, serviceName, serviceLabels, serviceAnnotations, servicePort, - serviceType, containerPortName, containerPort, sessionAffinity); + kubernetesApi + .createService(serviceId, serviceName, serviceLabels, serviceAnnotations, servicePort, serviceType, + containerPortName, containerPort, sessionAffinity); try { Thread.sleep(1000); - } - catch (InterruptedException ignore) { + } catch (InterruptedException ignore) { } Service service = kubernetesApi.getService(serviceId); @@ -628,8 +617,8 @@ public class KubernetesIaas extends Iaas { if (log.isInfoEnabled()) { log.info(String.format( - "Kubernetes service successfully created: [cluster] %s [service-id] %s [protocol] %s " + - "[node-port] %d [container-port] %s", clusterId, serviceId, + "Kubernetes service successfully created: [cluster] %s [service-id] %s [protocol] %s " + + "[node-port] %d [container-port] %s", clusterId, serviceId, clusterPortMapping.getProtocol(), servicePort, containerPort)); } } @@ -646,8 +635,7 @@ public class KubernetesIaas extends Iaas { * @throws KubernetesClientException */ private boolean serviceExistsInCluster(String serviceId, KubernetesClusterContext kubernetesClusterContext, - MemberContext memberContext, String portName) - throws KubernetesClientException { + MemberContext memberContext, String portName) throws KubernetesClientException { KubernetesApiClient kubernetesApi = kubernetesClusterContext.getKubApi(); Service service = kubernetesApi.getService(serviceId); @@ -663,8 +651,7 @@ public class KubernetesIaas extends Iaas { StringUtils.isNotEmpty(portNameLabel) && applicationIdLabel.equals(memberContext.getApplicationId()) && clusterInstanceIdLabel.equals(memberContext.getClusterInstanceId()) && - portNameLabel.equals(portName) - ); + portNameLabel.equals(portName)); } return false; } @@ -672,8 +659,8 @@ public class KubernetesIaas extends Iaas { private String trimLabel(String key, String value) { if (StringUtils.isNotEmpty(value) && (value.length() > KubernetesConstants.MAX_LABEL_LENGTH)) { String trimmed = value.substring(0, KubernetesConstants.MAX_LABEL_LENGTH - 2).concat("X"); - log.warn(String.format("Kubernetes label trimmed: [key] %s [original] %s [trimmed] %s", - key, value, trimmed)); + log.warn(String.format("Kubernetes label trimmed: [key] %s [original] %s [trimmed] %s", key, value, + trimmed)); return trimmed; } return value; @@ -687,8 +674,8 @@ public class KubernetesIaas extends Iaas { List<String> minionPublicIPList = new ArrayList<String>(); KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts(); if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) { - throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] " - + kubernetesCluster.getClusterId()); + throw new RuntimeException( + "Hosts not found in kubernetes cluster: [cluster] " + kubernetesCluster.getClusterId()); } for (KubernetesHost host : kubernetesHosts) { if (host != null) { @@ -706,7 +693,7 @@ public class KubernetesIaas extends Iaas { * @return */ private KubernetesService findKubernetesService(Collection<KubernetesService> kubernetesServices, - int containerPort) { + int containerPort) { if (kubernetesServices != null) { for (KubernetesService kubernetesService : kubernetesServices) { @@ -726,31 +713,32 @@ public class KubernetesIaas extends Iaas { * @param cartridge */ private void generateKubernetesServicePorts(String applicationId, String clusterId, - KubernetesClusterContext kubernetesClusterContext, - Cartridge cartridge) throws KubernetesClientException { + KubernetesClusterContext kubernetesClusterContext, Cartridge cartridge) + throws KubernetesClientException, RegistryException { synchronized (KubernetesIaas.class) { if (cartridge != null) { StringBuilder portMappingStrBuilder = new StringBuilder(); for (PortMapping portMapping : Arrays.asList(cartridge.getPortMappings())) { - Collection<ClusterPortMapping> clusterPortMappings = - CloudControllerContext.getInstance().getClusterPortMappings(applicationId, clusterId); + Collection<ClusterPortMapping> clusterPortMappings = CloudControllerContext.getInstance() + .getClusterPortMappings(applicationId, clusterId); if (clusterPortMappings == null) { - throw new CloudControllerException(String.format("Cluster port mappings not found: " + - "[application-id] %s [cluster-id] %s", applicationId, clusterId)); + throw new CloudControllerException(String.format( + "Cluster port mappings not found: " + "[application-id] %s [cluster-id] %s", + applicationId, clusterId)); } ClusterPortMapping clusterPortMapping = findClusterPortMapping(clusterPortMappings, portMapping); if (clusterPortMapping == null) { - throw new CloudControllerException(String.format("Cluster port mapping not found: " + - "[application-id] %s [cluster-id] %s [transport] %s", applicationId, clusterId, - portMapping.getName())); + throw new CloudControllerException(String.format("Cluster port mapping not found: " + + "[application-id] %s [cluster-id] %s [transport] %s", applicationId, + clusterId, portMapping.getName())); } if (clusterPortMapping.getKubernetesPortType() == null) { - throw new CloudControllerException(String.format("Kubernetes service type not " + - "found [application-id] %s [cluster-id] %s [cartridge] %s", applicationId, + throw new CloudControllerException(String.format("Kubernetes service type not " + + "found [application-id] %s [cluster-id] %s [cartridge] %s", applicationId, clusterId, cartridge)); } @@ -763,8 +751,8 @@ public class KubernetesIaas extends Iaas { int nextServicePort = kubernetesClusterContext.getNextServicePort(); if (nextServicePort == -1) { throw new RuntimeException( - String.format("Could not generate service port: [cluster-id] %s " + - "[port] %d", clusterId, portMapping.getPort())); + String.format("Could not generate service port: [cluster-id] %s " + "[port] %d", + clusterId, portMapping.getPort())); } // Find next available service port @@ -780,10 +768,9 @@ public class KubernetesIaas extends Iaas { } } else { if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes service port is already set: [application-id] %s " + - "[cluster-id] %s [port] %d [service-port] %d", - applicationId, clusterId, clusterPortMapping.getPort(), - clusterPortMapping.getKubernetesServicePort())); + log.debug(String.format("Kubernetes service port is already set: [application-id] %s " + + "[cluster-id] %s [port] %d [service-port] %d", applicationId, clusterId, + clusterPortMapping.getPort(), clusterPortMapping.getKubernetesServicePort())); } } @@ -791,16 +778,15 @@ public class KubernetesIaas extends Iaas { if (portMappingStrBuilder.toString().length() > 0) { portMappingStrBuilder.append(";"); } - portMappingStrBuilder.append(String.format("NAME:%s|PROTOCOL:%s|PORT:%d|PROXY_PORT:%d|TYPE:%s", - clusterPortMapping.getName(), clusterPortMapping.getProtocol(), - clusterPortMapping.getKubernetesServicePort(), clusterPortMapping.getProxyPort(), - clusterPortMapping.getKubernetesPortType())); + portMappingStrBuilder.append(String + .format("NAME:%s|PROTOCOL:%s|PORT:%d|PROXY_PORT:%d|TYPE:%s", clusterPortMapping.getName(), + clusterPortMapping.getProtocol(), clusterPortMapping.getKubernetesServicePort(), + clusterPortMapping.getProxyPort(), clusterPortMapping.getKubernetesPortType())); if (log.isInfoEnabled()) { - log.info(String.format("Kubernetes service port generated: [application-id] %s " + - "[cluster-id] %s [port] %d [service-port] %d", - applicationId, clusterId, clusterPortMapping.getPort(), - clusterPortMapping.getKubernetesServicePort())); + log.info(String.format("Kubernetes service port generated: [application-id] %s " + + "[cluster-id] %s [port] %d [service-port] %d", applicationId, clusterId, + clusterPortMapping.getPort(), clusterPortMapping.getKubernetesServicePort())); } } @@ -813,8 +799,7 @@ public class KubernetesIaas extends Iaas { } } - private boolean nodePortAvailable(List<Service> services, int nodePort) - throws KubernetesClientException { + private boolean nodePortAvailable(List<Service> services, int nodePort) throws KubernetesClientException { for (Service service : services) { for (ServicePort servicePort : service.getSpec().getPorts()) { @@ -826,7 +811,6 @@ public class KubernetesIaas extends Iaas { return true; } - /** * Find cluster port mapping that corresponds to cartridge port mapping. * @@ -835,7 +819,7 @@ public class KubernetesIaas extends Iaas { * @return */ private ClusterPortMapping findClusterPortMapping(Collection<ClusterPortMapping> clusterPortMappings, - PortMapping portMapping) { + PortMapping portMapping) { for (ClusterPortMapping clusterPortMapping : clusterPortMappings) { if (clusterPortMapping.getName().equals(portMapping.getName())) { return clusterPortMapping; @@ -851,7 +835,8 @@ public class KubernetesIaas extends Iaas { * @return * @throws MemberTerminationFailedException */ - public MemberContext terminateContainer(MemberContext memberContext) throws MemberTerminationFailedException { + public MemberContext terminateContainer(MemberContext memberContext) + throws MemberTerminationFailedException, RegistryException { Lock lock = null; try { lock = CloudControllerContext.getInstance().acquireMemberContextWriteLock(); @@ -859,23 +844,23 @@ public class KubernetesIaas extends Iaas { Partition partition = memberContext.getPartition(); if (partition == null) { - String message = String.format("Partition not found in member context: [member] %s ", - memberContext.getMemberId()); + String message = String + .format("Partition not found in member context: [member] %s ", memberContext.getMemberId()); log.error(message); throw new RuntimeException(message); } String kubernetesClusterId = memberContext.getPartition().getKubernetesClusterId(); - handleNullObject(kubernetesClusterId, String.format("Could not terminate container, kubernetes cluster " + - "context id is null: [partition-id] %s [member-id] %s", partition.getId(), + handleNullObject(kubernetesClusterId, String.format("Could not terminate container, kubernetes cluster " + + "context id is null: [partition-id] %s [member-id] %s", partition.getId(), memberContext.getMemberId())); - KubernetesClusterContext kubernetesClusterContext = - CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId); - handleNullObject(kubernetesClusterContext, - String.format("Could not terminate container, kubernetes cluster " + - "context not found: [partition-id] %s [member-id] %s", partition.getId(), + KubernetesClusterContext kubernetesClusterContext = CloudControllerContext.getInstance() + .getKubernetesClusterContext(kubernetesClusterId); + handleNullObject(kubernetesClusterContext, String.format( + "Could not terminate container, kubernetes cluster " + + "context not found: [partition-id] %s [member-id] %s", partition.getId(), memberContext.getMemberId())); KubernetesApiClient kubApi = kubernetesClusterContext.getKubApi(); @@ -889,18 +874,16 @@ public class KubernetesIaas extends Iaas { // Persist changes CloudControllerContext.getInstance().persist(); - log.info(String.format("Kubernetes pod removed successfully: [application] %s [cartridge] %s " + - "[member] %s [pod] %s", - memberContext.getApplicationId(), memberContext.getCartridgeType(), memberContext.getMemberId(), + log.info(String.format("Kubernetes pod removed successfully: [application] %s [cartridge] %s " + + "[member] %s [pod] %s", memberContext.getApplicationId(), + memberContext.getCartridgeType(), memberContext.getMemberId(), memberContext.getKubernetesPodId())); - } - catch (KubernetesClientException ignore) { + } catch (KubernetesClientException ignore) { // we can't do nothing here log.warn(String.format("Could not delete pod: [pod-id] %s", memberContext.getKubernetesPodId())); } return memberContext; - } - finally { + } finally { if (lock != null) { CloudControllerContext.getInstance().releaseWriteLock(lock); } @@ -918,8 +901,7 @@ public class KubernetesIaas extends Iaas { * @return */ private KubernetesClusterContext getKubernetesClusterContext(String kubernetesClusterId, String kubernetesMasterIp, - String kubernetesMasterPort, int upperPort, - int lowerPort) { + String kubernetesMasterPort, int upperPort, int lowerPort) { KubernetesClusterContext kubernetesClusterContext = CloudControllerContext.getInstance(). getKubernetesClusterContext(kubernetesClusterId); @@ -1009,29 +991,28 @@ public class KubernetesIaas extends Iaas { public static void removeKubernetesServices(ClusterContext clusterContext, String clusterInstanceId) { if (clusterContext != null) { - ArrayList<KubernetesService> kubernetesServices = - Lists.newArrayList(clusterContext.getKubernetesServices(clusterInstanceId)); + ArrayList<KubernetesService> kubernetesServices = Lists + .newArrayList(clusterContext.getKubernetesServices(clusterInstanceId)); for (KubernetesService kubernetesService : kubernetesServices) { - KubernetesClusterContext kubernetesClusterContext = - CloudControllerContext.getInstance() - .getKubernetesClusterContext(kubernetesService.getKubernetesClusterId()); + KubernetesClusterContext kubernetesClusterContext = CloudControllerContext.getInstance() + .getKubernetesClusterContext(kubernetesService.getKubernetesClusterId()); KubernetesApiClient kubernetesApiClient = kubernetesClusterContext.getKubApi(); String serviceId = kubernetesService.getId(); - log.info(String.format("Deleting kubernetes service: [application-id] %s " + - "[service-id] %s", clusterContext.getApplicationId(), serviceId)); + log.info(String.format("Deleting kubernetes service: [application-id] %s " + "[service-id] %s", + clusterContext.getApplicationId(), serviceId)); try { kubernetesApiClient.deleteService(serviceId); kubernetesClusterContext.deallocatePort(kubernetesService.getPort()); clusterContext.removeKubernetesService(clusterInstanceId, serviceId); - } - catch (KubernetesClientException e) { - log.error(String.format("Could not delete kubernetes service: [application-id] %s " + - "[service-id] %s", clusterContext.getApplicationId(), serviceId), e); + } catch (KubernetesClientException e) { + log.error(String.format( + "Could not delete kubernetes service: [application-id] %s " + "[service-id] %s", + clusterContext.getApplicationId(), serviceId), e); } } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java index 3b38e97..bd35e25 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationEventReceiver.java @@ -23,12 +23,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; import org.apache.stratos.messaging.domain.application.Application; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.application.ApplicationDeletedEvent; import org.apache.stratos.messaging.event.application.ApplicationInstanceTerminatedEvent; -import org.apache.stratos.messaging.listener.application.ApplicationDeletedEventListener; import org.apache.stratos.messaging.listener.application.ApplicationInstanceTerminatedEventListener; import org.apache.stratos.messaging.message.receiver.application.ApplicationManager; import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.concurrent.ExecutorService; @@ -43,11 +42,9 @@ public class ApplicationEventReceiver { public ApplicationEventReceiver() { this.applicationsEventReceiver = new ApplicationsEventReceiver(); addEventListeners(); - } public void execute() { - if (log.isInfoEnabled()) { log.info("Cloud controller application event receiver thread started"); } @@ -56,22 +53,24 @@ public class ApplicationEventReceiver { } private void addEventListeners() { - applicationsEventReceiver.addEventListener(new ApplicationInstanceTerminatedEventListener() { @Override protected void onEvent(Event event) { // Remove the application related data - ApplicationInstanceTerminatedEvent instanceTerminatedEvent = - (ApplicationInstanceTerminatedEvent) event; - log.info("Application instance terminated event received: [application-id] " + - instanceTerminatedEvent.getAppId()); + ApplicationInstanceTerminatedEvent instanceTerminatedEvent = (ApplicationInstanceTerminatedEvent) event; + log.info("Application instance terminated event received: [application-id] " + instanceTerminatedEvent + .getAppId()); String appId = instanceTerminatedEvent.getAppId(); Application application = ApplicationManager.getApplications(). getApplication(instanceTerminatedEvent.getAppId()); - if(application.getInstanceContextCount() == 0) { - TopologyBuilder.handleApplicationClustersRemoved(appId, - application.getClusterDataRecursively()); + if (application.getInstanceContextCount() == 0) { + try { + TopologyBuilder + .handleApplicationClustersRemoved(appId, application.getClusterDataRecursively()); + } catch (RegistryException e) { + log.error("Failed to process application instance terminated event", e); + } } } }); http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java index 1afe6d8..daa6bf5 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.cluster.status.*; import org.apache.stratos.messaging.listener.cluster.status.*; import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.concurrent.ExecutorService; @@ -55,7 +56,11 @@ public class ClusterStatusTopicReceiver { statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event); + try { + TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event); + } catch (RegistryException e) { + log.error("Failed to process cluster status reset event", e); + } } }); @@ -69,28 +74,44 @@ public class ClusterStatusTopicReceiver { statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event); + try { + TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event); + } catch (RegistryException e) { + log.error("Failed to process cluster activated event", e); + } } }); statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event); + try { + TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event); + } catch (RegistryException e) { + log.error("Failed to process cluster termination event", e); + } } }); statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event); + try { + TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event); + } catch (RegistryException e) { + log.error("Failed to process cluster termination event", e); + } } }); statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleClusterInactivateEvent((ClusterStatusClusterInactivateEvent) event); + try { + TopologyBuilder.handleClusterInactivateEvent((ClusterStatusClusterInactivateEvent) event); + } catch (RegistryException e) { + log.error("Failed to process cluster inactive event", e); + } } }); } http://git-wip-us.apache.org/repos/asf/stratos/blob/8d46fab0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java index cf69aff..2d8a275 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java @@ -31,6 +31,7 @@ import org.apache.stratos.messaging.listener.instance.status.InstanceMaintenance import org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShutdownEventListener; import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener; import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.concurrent.ExecutorService; @@ -65,7 +66,11 @@ public class InstanceStatusTopicReceiver { statusEventReceiver.addEventListener(new InstanceActivatedEventListener() { @Override protected void onEvent(Event event) { - TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event); + try { + TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event); + } catch (RegistryException e) { + log.error("Could not persist data in registry data store", e); + } } }); @@ -82,7 +87,7 @@ public class InstanceStatusTopicReceiver { try { TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event); } catch (Exception e) { - String error = "Failed to retrieve the instance status event message"; + String error = "Failed to process the instance status event message"; log.error(error, e); } } @@ -94,7 +99,7 @@ public class InstanceStatusTopicReceiver { try { TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event); } catch (Exception e) { - String error = "Failed to retrieve the instance status event message"; + String error = "Failed to process the instance status event message"; log.error(error, e); } }
