DaanHoogland commented on a change in pull request #3680: [WIP: DO NOT MERGE] CloudStack Kubernetes Service URL: https://github.com/apache/cloudstack/pull/3680#discussion_r364162651
########## File path: plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetescluster/KubernetesClusterManagerImpl.java ########## @@ -0,0 +1,3105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.kubernetescluster; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Field; +import java.math.BigInteger; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.Socket; +import java.net.URL; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.inject.Inject; +import javax.naming.ConfigurationException; + +import org.apache.cloudstack.acl.ControlledEntity; +import org.apache.cloudstack.acl.SecurityChecker; +import org.apache.cloudstack.api.ApiConstants; +import org.apache.cloudstack.api.BaseCmd; +import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.CreateKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.DeleteKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.GetKubernetesClusterConfigCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.ListKubernetesClustersCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.ScaleKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.StartKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.StopKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.kubernetescluster.UpgradeKubernetesClusterCmd; +import org.apache.cloudstack.api.command.user.vm.StartVMCmd; +import org.apache.cloudstack.api.response.KubernetesClusterConfigResponse; +import org.apache.cloudstack.api.response.KubernetesClusterResponse; +import org.apache.cloudstack.api.response.ListResponse; +import org.apache.cloudstack.ca.CAManager; +import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; +import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine; +import org.apache.cloudstack.framework.ca.Certificate; +import org.apache.cloudstack.framework.config.ConfigKey; +import org.apache.cloudstack.framework.config.dao.ConfigurationDao; +import org.apache.cloudstack.managed.context.ManagedContextRunnable; +import org.apache.cloudstack.utils.security.CertUtils; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.cloud.api.ApiDBUtils; +import com.cloud.api.query.dao.NetworkOfferingJoinDao; +import com.cloud.api.query.dao.TemplateJoinDao; +import com.cloud.api.query.vo.NetworkOfferingJoinVO; +import com.cloud.api.query.vo.TemplateJoinVO; +import com.cloud.capacity.CapacityManager; +import com.cloud.dc.ClusterDetailsDao; +import com.cloud.dc.ClusterDetailsVO; +import com.cloud.dc.ClusterVO; +import com.cloud.dc.DataCenter; +import com.cloud.dc.DataCenterVO; +import com.cloud.dc.Pod; +import com.cloud.dc.Vlan; +import com.cloud.dc.VlanVO; +import com.cloud.dc.dao.ClusterDao; +import com.cloud.dc.dao.DataCenterDao; +import com.cloud.dc.dao.VlanDao; +import com.cloud.deploy.DeployDestination; +import com.cloud.exception.ConcurrentOperationException; +import com.cloud.exception.InsufficientAddressCapacityException; +import com.cloud.exception.InsufficientCapacityException; +import com.cloud.exception.InsufficientServerCapacityException; +import com.cloud.exception.InvalidParameterValueException; +import com.cloud.exception.ManagementServerException; +import com.cloud.exception.NetworkRuleConflictException; +import com.cloud.exception.PermissionDeniedException; +import com.cloud.exception.ResourceAllocationException; +import com.cloud.exception.ResourceUnavailableException; +import com.cloud.exception.VirtualMachineMigrationException; +import com.cloud.host.Host.Type; +import com.cloud.host.HostVO; +import com.cloud.hypervisor.Hypervisor; +import com.cloud.kubernetescluster.dao.KubernetesClusterDao; +import com.cloud.kubernetescluster.dao.KubernetesClusterDetailsDao; +import com.cloud.kubernetescluster.dao.KubernetesClusterVmMapDao; +import com.cloud.kubernetesversion.KubernetesSupportedVersion; +import com.cloud.kubernetesversion.KubernetesSupportedVersionVO; +import com.cloud.kubernetesversion.KubernetesVersionManagerImpl; +import com.cloud.kubernetesversion.dao.KubernetesSupportedVersionDao; +import com.cloud.network.IpAddress; +import com.cloud.network.IpAddressManager; +import com.cloud.network.Network; +import com.cloud.network.Network.Service; +import com.cloud.network.NetworkModel; +import com.cloud.network.NetworkService; +import com.cloud.network.PhysicalNetwork; +import com.cloud.network.addr.PublicIp; +import com.cloud.network.dao.FirewallRulesDao; +import com.cloud.network.dao.IPAddressDao; +import com.cloud.network.dao.NetworkDao; +import com.cloud.network.dao.NetworkVO; +import com.cloud.network.dao.PhysicalNetworkDao; +import com.cloud.network.firewall.FirewallService; +import com.cloud.network.lb.LoadBalancingRulesService; +import com.cloud.network.rules.FirewallRule; +import com.cloud.network.rules.FirewallRuleVO; +import com.cloud.network.rules.LoadBalancer; +import com.cloud.network.rules.PortForwardingRuleVO; +import com.cloud.network.rules.RulesService; +import com.cloud.network.rules.dao.PortForwardingRulesDao; +import com.cloud.offering.NetworkOffering; +import com.cloud.offering.ServiceOffering; +import com.cloud.offerings.NetworkOfferingVO; +import com.cloud.offerings.dao.NetworkOfferingDao; +import com.cloud.offerings.dao.NetworkOfferingServiceMapDao; +import com.cloud.org.Grouping; +import com.cloud.resource.ResourceManager; +import com.cloud.service.ServiceOfferingVO; +import com.cloud.service.dao.ServiceOfferingDao; +import com.cloud.storage.Storage; +import com.cloud.storage.VMTemplateVO; +import com.cloud.storage.VMTemplateZoneVO; +import com.cloud.storage.dao.VMTemplateDao; +import com.cloud.storage.dao.VMTemplateZoneDao; +import com.cloud.template.TemplateApiService; +import com.cloud.template.VirtualMachineTemplate; +import com.cloud.user.Account; +import com.cloud.user.AccountManager; +import com.cloud.user.AccountService; +import com.cloud.user.SSHKeyPairVO; +import com.cloud.user.User; +import com.cloud.user.dao.AccountDao; +import com.cloud.user.dao.SSHKeyPairDao; +import com.cloud.uservm.UserVm; +import com.cloud.utils.Pair; +import com.cloud.utils.StringUtils; +import com.cloud.utils.component.ComponentContext; +import com.cloud.utils.component.ManagerBase; +import com.cloud.utils.concurrency.NamedThreadFactory; +import com.cloud.utils.db.Filter; +import com.cloud.utils.db.GlobalLock; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallback; +import com.cloud.utils.db.TransactionCallbackNoReturn; +import com.cloud.utils.db.TransactionCallbackWithException; +import com.cloud.utils.db.TransactionStatus; +import com.cloud.utils.exception.CloudRuntimeException; +import com.cloud.utils.exception.ExecutionException; +import com.cloud.utils.fsm.NoTransitionException; +import com.cloud.utils.fsm.StateMachine2; +import com.cloud.utils.net.Ip; +import com.cloud.utils.net.NetUtils; +import com.cloud.utils.ssh.SshHelper; +import com.cloud.vm.Nic; +import com.cloud.vm.ReservationContext; +import com.cloud.vm.ReservationContextImpl; +import com.cloud.vm.UserVmManager; +import com.cloud.vm.UserVmService; +import com.cloud.vm.UserVmVO; +import com.cloud.vm.VMInstanceVO; +import com.cloud.vm.VirtualMachine; +import com.cloud.vm.dao.UserVmDao; +import com.cloud.vm.dao.VMInstanceDao; +import com.google.common.base.Strings; + +public class KubernetesClusterManagerImpl extends ManagerBase implements KubernetesClusterService { + + private static final Logger LOGGER = Logger.getLogger(KubernetesClusterManagerImpl.class); + + protected StateMachine2<KubernetesCluster.State, KubernetesCluster.Event, KubernetesCluster> _stateMachine = KubernetesCluster.State.getStateMachine(); + + ScheduledExecutorService _gcExecutor; + ScheduledExecutorService _stateScanner; + + @Inject + protected KubernetesClusterDao kubernetesClusterDao; + @Inject + protected KubernetesClusterVmMapDao kubernetesClusterVmMapDao; + @Inject + protected KubernetesClusterDetailsDao kubernetesClusterDetailsDao; + @Inject + protected KubernetesSupportedVersionDao kubernetesSupportedVersionDao; + @Inject + protected CAManager caManager; + @Inject + protected SSHKeyPairDao sshKeyPairDao; + @Inject + protected DataCenterDao dataCenterDao; + @Inject + protected ClusterDao clusterDao; + @Inject + protected ClusterDetailsDao clusterDetailsDao; + @Inject + protected ServiceOfferingDao serviceOfferingDao; + @Inject + protected VMTemplateDao templateDao; + @Inject + protected TemplateApiService templateService; + @Inject + protected VMTemplateZoneDao templateZoneDao; + @Inject + protected TemplateJoinDao templateJoinDao; + @Inject + protected AccountService accountService; + @Inject + protected AccountDao accountDao; + @Inject + protected AccountManager accountManager; + @Inject + protected VMInstanceDao vmInstanceDao; + @Inject + protected UserVmDao userVmDao; + @Inject + protected UserVmService userVmService; + @Inject + protected UserVmManager userVmManager; + @Inject + protected ConfigurationDao globalConfigDao; + @Inject + protected NetworkOfferingDao networkOfferingDao; + @Inject + protected NetworkOfferingJoinDao networkOfferingJoinDao; + @Inject + protected NetworkService networkService; + @Inject + protected NetworkModel networkModel; + @Inject + protected PhysicalNetworkDao physicalNetworkDao; + @Inject + protected NetworkOrchestrationService networkMgr; + @Inject + protected NetworkDao networkDao; + @Inject + protected IPAddressDao ipAddressDao; + @Inject + protected PortForwardingRulesDao portForwardingRulesDao; + @Inject + protected FirewallService firewallService; + @Inject + protected RulesService rulesService; + @Inject + protected NetworkOfferingServiceMapDao networkOfferingServiceMapDao; + @Inject + protected CapacityManager capacityManager; + @Inject + protected ResourceManager resourceManager; + @Inject + protected FirewallRulesDao firewallRulesDao; + @Inject + protected IpAddressManager ipAddressManager; + @Inject + protected LoadBalancingRulesService lbService; + @Inject + protected VlanDao vlanDao; + + private static final String CLUSTER_NODE_VM_USER = "core"; + private static final int CLUSTER_API_PORT = 6443; + private static final int CLUSTER_NODES_DEFAULT_START_SSH_PORT = 2222; + + private static String getStackTrace(final Throwable throwable) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + throwable.printStackTrace(pw); + return sw.getBuffer().toString(); + } + + private String readResourceFile(String resource) throws IOException { + return IOUtils.toString(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResourceAsStream(resource)), Charset.defaultCharset().name()); + } + + private void logMessage(final Level logLevel, final String message, final Exception e) { + if (logLevel == Level.DEBUG) { + LOGGER.debug(message); + } else if (logLevel == Level.ERROR) { + LOGGER.error(message); + } if (logLevel == Level.WARN) { + if (e != null) { + LOGGER.warn(message, e); + } else { + LOGGER.warn(message); + } + } else { + if (e != null) { + LOGGER.error(message, e); + } else { + LOGGER.error(message); + } + } + } + + private void logTransitStateAndThrow(final Level logLevel, final String message, final Long kubernetesClusterId, final KubernetesCluster.Event event, final Exception e) throws CloudRuntimeException { + logMessage(logLevel, message, e); + if (kubernetesClusterId != null && event != null) { + stateTransitTo(kubernetesClusterId, event); + } + if (e == null) { + throw new CloudRuntimeException(message); + } + throw new CloudRuntimeException(message, e); + } + + private void logTransitStateDetachIsoAndThrow(final Level logLevel, final String message, final KubernetesCluster kubernetesCluster, + final List<Long> clusterVMIds, final KubernetesCluster.Event event, final Exception e) throws CloudRuntimeException { + logMessage(logLevel, message, e); + stateTransitTo(kubernetesCluster.getId(), event); + detachIsoKubernetesVMs(kubernetesCluster, clusterVMIds); + if (e == null) { + throw new CloudRuntimeException(message); + } + throw new CloudRuntimeException(message, e); + } + + private void logTransitStateAndThrow(final Level logLevel, final String message, final Long kubernetesClusterId, final KubernetesCluster.Event event) throws CloudRuntimeException { + logTransitStateAndThrow(logLevel, message, kubernetesClusterId, event, null); + } + + private void logAndThrow(final Level logLevel, final String message) throws CloudRuntimeException { + logTransitStateAndThrow(logLevel, message, null, null, null); + } + + private void logAndThrow(final Level logLevel, final String message, final Exception ex) throws CloudRuntimeException { + logTransitStateAndThrow(logLevel, message, null, null, ex); + } + + private boolean isKubernetesServiceTemplateConfigured(DataCenter zone) { + // Check Kubernetes VM template for zone + String templateName = KubernetesClusterTemplateName.value(); + if (templateName == null || templateName.isEmpty()) { + LOGGER.warn(String.format("Global setting %s is empty. Template name need to be specified for Kubernetes service to function", KubernetesClusterTemplateName.key())); + return false; + } + final VMTemplateVO template = templateDao.findByTemplateName(templateName); + if (template == null) { + LOGGER.warn(String.format("Unable to find the template %s to be used for provisioning Kubernetes cluster", templateName)); + return false; + } + return true; + } + + private boolean isKubernetesServiceNetworkOfferingConfigured(DataCenter zone) { + // Check network offering + String networkOfferingName = KubernetesClusterNetworkOffering.value(); + if (networkOfferingName == null || networkOfferingName.isEmpty()) { + LOGGER.warn(String.format("Global setting %s is empty. Admin has not yet specified the network offering to be used for provisioning isolated network for the cluster", KubernetesClusterNetworkOffering.key())); + return false; + } + NetworkOfferingVO networkOffering = networkOfferingDao.findByUniqueName(networkOfferingName); + if (networkOffering == null) { + LOGGER.warn(String.format("Unable to find the network offering %s to be used for provisioning Kubernetes cluster", networkOfferingName)); + return false; + } + if (networkOffering.getState() == NetworkOffering.State.Disabled) { + LOGGER.warn(String.format("Network offering ID: %s is not enabled", networkOffering.getUuid())); + return false; + } + List<String> services = networkOfferingServiceMapDao.listServicesForNetworkOffering(networkOffering.getId()); + if (services == null || services.isEmpty() || !services.contains("SourceNat")) { + LOGGER.warn(String.format("Network offering ID: %s does not have necessary services to provision Kubernetes cluster", networkOffering.getUuid())); + return false; + } + if (!networkOffering.isEgressDefaultPolicy()) { + LOGGER.warn(String.format("Network offering ID: %s has egress default policy turned off should be on to provision Kubernetes cluster", networkOffering.getUuid())); + return false; + } + boolean offeringAvailableForZone = false; + List<NetworkOfferingJoinVO> networkOfferingJoinVOs = networkOfferingJoinDao.findByZoneId(zone.getId(), true); + for (NetworkOfferingJoinVO networkOfferingJoinVO : networkOfferingJoinVOs) { + if (networkOffering.getId() == networkOfferingJoinVO.getId()) { + offeringAvailableForZone = true; + break; + } + } + if (!offeringAvailableForZone) { + LOGGER.warn(String.format("Network offering ID: %s is not available for zone ID: %s", networkOffering.getUuid(), zone.getUuid())); + return false; + } + long physicalNetworkId = networkModel.findPhysicalNetworkId(zone.getId(), networkOffering.getTags(), networkOffering.getTrafficType()); + PhysicalNetwork physicalNetwork = physicalNetworkDao.findById(physicalNetworkId); + if (physicalNetwork == null) { + LOGGER.warn(String.format("Unable to find physical network with tag: %s", networkOffering.getTags())); + return false; + } + return true; + } + + private boolean isKubernetesServiceConfigured(DataCenter zone) { + if (!isKubernetesServiceTemplateConfigured(zone)) { + return false; + } + if (!isKubernetesServiceNetworkOfferingConfigured(zone)) { + return false; + } + return true; + } + + private File getManagementServerSshPublicKeyFile() { + boolean devel = Boolean.parseBoolean(globalConfigDao.getValue("developer")); + String keyFile = String.format("%s/.ssh/id_rsa", System.getProperty("user.home")); + if (devel) { + keyFile += ".cloud"; + } + return new File(keyFile); + } + + private String generateClusterToken(KubernetesCluster kubernetesCluster) { + String token = kubernetesCluster.getUuid(); + token = token.replaceAll("-", ""); + token = token.substring(0, 22); + token = token.substring(0, 6) + "." + token.substring(6); + return token; + } + + private String generateClusterHACertificateKey(KubernetesCluster kubernetesCluster) { + String uuid = kubernetesCluster.getUuid(); + StringBuilder token = new StringBuilder(uuid.replaceAll("-", "")); + while (token.length() < 64) { + token.append(token); + } + return token.toString().substring(0, 64); + } + + private KubernetesClusterVmMapVO addKubernetesClusterVm(final long kubernetesClusterId, final long vmId) { + return Transaction.execute(new TransactionCallback<KubernetesClusterVmMapVO>() { + @Override + public KubernetesClusterVmMapVO doInTransaction(TransactionStatus status) { + KubernetesClusterVmMapVO newClusterVmMap = new KubernetesClusterVmMapVO(kubernetesClusterId, vmId); + kubernetesClusterVmMapDao.persist(newClusterVmMap); + return newClusterVmMap; + } + }); + } + + private boolean isKubernetesClusterServerRunning(KubernetesCluster kubernetesCluster, String ipAddress, int retries, long waitDuration) { + int retryCounter = 0; + boolean k8sApiServerSetup = false; + while (retryCounter < retries) { + try { + String versionOutput = IOUtils.toString(new URL(String.format("https://%s:%d/version", ipAddress, CLUSTER_API_PORT)), StringUtils.getPreferredCharset()); + if (!Strings.isNullOrEmpty(versionOutput)) { + LOGGER.debug(String.format("Kubernetes cluster ID: %s API has been successfully provisioned, %s", kubernetesCluster.getUuid(), versionOutput)); + k8sApiServerSetup = true; + break; + } + } catch (Exception e) { + LOGGER.warn(String.format("API endpoint for Kubernetes cluster ID: %s not available. Attempt: %d/%d", kubernetesCluster.getUuid(), retryCounter+1, retries), e); + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ie) { + LOGGER.error(String.format("Error while waiting for Kubernetes cluster ID: %s API endpoint to be available", kubernetesCluster.getUuid()), ie); + } + retryCounter++; + } + return k8sApiServerSetup; + } + + private String getKubernetesClusterConfig(KubernetesCluster kubernetesCluster, String ipAddress, int port, int retries) { + int retryCounter = 0; + String kubeConfig = ""; + while (retryCounter < retries) { + try { + Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port, CLUSTER_NODE_VM_USER, + getManagementServerSshPublicKeyFile(), null, "sudo cat /etc/kubernetes/admin.conf", + 10000, 10000, 10000); + + if (result.first() && !Strings.isNullOrEmpty(result.second())) { + kubeConfig = result.second(); + break; + } else { + LOGGER.debug(String.format("Failed to retrieve kube-config file for Kubernetes cluster ID: %s. Output: %s", kubernetesCluster.getUuid(), result.second())); + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed to retrieve kube-config file for Kubernetes cluster ID: %s. Attempt: %d/%d", kubernetesCluster.getUuid(), retryCounter+1, retries), e); + } + retryCounter++; + } + return kubeConfig; + } + + private boolean isKubernetesClusterAddOnServiceRunning(KubernetesCluster kubernetesCluster, final String ipAddress, final int port, final String namespace, String serviceName) { + try { + String cmd = "sudo kubectl get pods --all-namespaces"; + if (!Strings.isNullOrEmpty(namespace)) { + cmd = String.format("sudo kubectl get pods --namespace=%s", namespace); + } + Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port, CLUSTER_NODE_VM_USER, + getManagementServerSshPublicKeyFile(), null, cmd, + 10000, 10000, 10000); + if (result.first() && !Strings.isNullOrEmpty(result.second())) { + String[] lines = result.second().split("\n"); + for (String line : + lines) { + if (line.contains(serviceName) && line.contains("Running")) { + LOGGER.debug(String.format("Service : %s in namespace: %s for the Kubernetes cluster ID: %s is running",serviceName, namespace, kubernetesCluster.getUuid())); + return true; + } + } + } + } catch (Exception e) { + LOGGER.warn(String.format("Unable to retrieve service: %s running status in namespace %s for Kubernetes cluster ID: %s", serviceName, namespace, kubernetesCluster.getUuid()), e); + } + return false; + } + + private boolean isKubernetesClusterDashboardServiceRunning(KubernetesCluster kubernetesCluster, String ipAddress, int port, int retries, long waitDuration) { + boolean running = false; + int retryCounter = 0; + // Check if dashboard service is up running. + while (retryCounter < retries) { + LOGGER.debug(String.format("Checking dashboard service for the Kubernetes cluster ID: %s to come up. Attempt: %d/%d", kubernetesCluster.getUuid(), retryCounter+1, retries)); + if (isKubernetesClusterAddOnServiceRunning(kubernetesCluster, ipAddress, port, "kubernetes-dashboard", "kubernetes-dashboard")) { + LOGGER.info(String.format("Dashboard service for the Kubernetes cluster ID: %s is in running state", kubernetesCluster.getUuid())); + running = true; + break; + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ex) { + LOGGER.error(String.format("Error while waiting for Kubernetes cluster: %s API dashboard service to be available", kubernetesCluster.getUuid()), ex); + } + retryCounter++; + } + return running; + } + + private UserVm fetchMasterVmIfMissing(final KubernetesCluster kubernetesCluster, final int port, final UserVm masterVm) { + if (masterVm != null) { + return masterVm; + } + List<KubernetesClusterVmMapVO> clusterVMs = kubernetesClusterVmMapDao.listByClusterId(kubernetesCluster.getId()); + if (CollectionUtils.isEmpty(clusterVMs)) { + LOGGER.warn(String.format("Unable to retrieve VMs for Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + return null; + } + List<Long> vmIds = new ArrayList<>(); + for (KubernetesClusterVmMapVO vmMap : clusterVMs) { + vmIds.add(vmMap.getVmId()); + } + Collections.sort(vmIds); + return userVmDao.findById(vmIds.get(0)); + } + + private Pair<String, Integer> getKubernetesClusterServerIpSshPort(KubernetesCluster kubernetesCluster, UserVm masterVm) { + int port = CLUSTER_NODES_DEFAULT_START_SSH_PORT; + KubernetesClusterDetailsVO detail = kubernetesClusterDetailsDao.findDetail(kubernetesCluster.getId(), ApiConstants.EXTERNAL_LOAD_BALANCER_IP_ADDRESS); + if (detail != null && !Strings.isNullOrEmpty(detail.getValue())) { + return new Pair<>(detail.getValue(), port); + } + Network network = networkDao.findById(kubernetesCluster.getNetworkId()); + if (network == null) { + LOGGER.warn(String.format("Network for Kubernetes cluster ID: %s cannot be found", kubernetesCluster.getUuid())); + return new Pair<>(null, port); + } + if (Network.GuestType.Isolated.equals(network.getGuestType())) { + List<? extends IpAddress> addresses = networkModel.listPublicIpsAssignedToGuestNtwk(network.getId(), true); + if (CollectionUtils.isEmpty(addresses)) { + LOGGER.warn(String.format("No public IP addresses found for network ID: %s, Kubernetes cluster ID: %s", network.getUuid(), kubernetesCluster.getUuid())); + return new Pair<>(null, port); + } + for (IpAddress address : addresses) { + if (address.isSourceNat()) { + return new Pair<>(address.getAddress().addr(), port); + } + } + LOGGER.warn(String.format("No source NAT IP addresses found for network ID: %s, Kubernetes cluster ID: %s", network.getUuid(), kubernetesCluster.getUuid())); + return new Pair<>(null, port); + } else if (Network.GuestType.Shared.equals(network.getGuestType())) { + port = 22; + masterVm = fetchMasterVmIfMissing(kubernetesCluster, port, masterVm); + if (masterVm == null) { + LOGGER.warn(String.format("Unable to retrieve master VM for Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + return new Pair<>(null, port); + } + return new Pair<>(masterVm.getPrivateIpAddress(), port); + } + LOGGER.warn(String.format("Unable to retrieve server IP address for Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + return new Pair<>(null, port); + } + + private Pair<String, Integer> getKubernetesClusterServerIpSshPort(KubernetesCluster kubernetesCluster) { + return getKubernetesClusterServerIpSshPort(kubernetesCluster, null); + } + + private int getKubernetesClusterReadyNodesCount(KubernetesCluster kubernetesCluster, String ipAddress, int port) throws Exception { + Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port, + CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), null, + "sudo kubectl get nodes | awk '{if ($2 == \"Ready\") print $1}' | wc -l", + 10000, 10000, 20000); + if (result.first()) { + return Integer.parseInt(result.second().trim().replace("\"", "")); + } else { + LOGGER.debug(String.format("Failed to retrieve ready nodes for Kubernetes cluster ID: %s. Output: %s", kubernetesCluster.getUuid(), result.second())); + } + return 0; + } + + private boolean isKubernetesClusterNodeReady(KubernetesCluster kubernetesCluster, String ipAddress, int port, String nodeName) throws Exception { + Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port, + CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), null, + String.format("sudo kubectl get nodes | awk '{if ($1 == \"%s\" && $2 == \"Ready\") print $1}'", nodeName.toLowerCase()), + 10000, 10000, 20000); + if (result.first() && nodeName.equals(result.second().trim())) { + return true; + } + LOGGER.debug(String.format("Failed to retrieve status for node: %s in Kubernetes cluster ID: %s. Output: %s", nodeName, kubernetesCluster.getUuid(), result.second())); + return false; + } + + private boolean isKubernetesClusterNodeReady(KubernetesCluster kubernetesCluster, String ipAddress, int port, String nodeName, int retries, int waitDuration) { + int retryCounter = 0; + while (retryCounter < retries) { + boolean ready = false; + try { + ready = isKubernetesClusterNodeReady(kubernetesCluster, ipAddress, port, nodeName); + } catch (Exception e) { + LOGGER.warn(String.format("Failed to retrieve state of node: %s in Kubernetes cluster ID: %s", nodeName, kubernetesCluster.getUuid()), e); + } + if (ready) { + return true; + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ie) { + LOGGER.error(String.format("Error while waiting for Kubernetes cluster ID: %s node: %s to become ready", kubernetesCluster.getUuid(), nodeName), ie); + } + retryCounter++; + } + return false; + } + + private int getKubernetesClusterReadyNodesCount(KubernetesCluster kubernetesCluster) throws Exception { + Pair<String, Integer> ipSshPort = getKubernetesClusterServerIpSshPort(kubernetesCluster); + String ipAddress = ipSshPort.first(); + int sshPort = ipSshPort.second(); + if (Strings.isNullOrEmpty(ipAddress)) { + String msg = String.format("No public IP found for Kubernetes cluster ID: %s" , kubernetesCluster.getUuid()); + LOGGER.warn(msg); + throw new ManagementServerException(msg); + } + return getKubernetesClusterReadyNodesCount(kubernetesCluster, ipAddress, sshPort); + } + + private boolean validateKubernetesClusterReadyNodesCount(KubernetesCluster kubernetesCluster, String ipAddress, int port, int retries, long waitDuration) { + int retryCounter = 0; + while (retryCounter < retries) { + // "sudo kubectl get nodes -o json | jq \".items[].metadata.name\" | wc -l" + LOGGER.debug(String.format("Checking ready nodes for the Kubernetes cluster ID: %s with total %d provisioned nodes. Attempt: %d/%d", kubernetesCluster.getUuid(), kubernetesCluster.getTotalNodeCount(), retryCounter+1, retries)); + try { + int nodesCount = getKubernetesClusterReadyNodesCount(kubernetesCluster, ipAddress, port); + if (nodesCount == kubernetesCluster.getTotalNodeCount()) { + LOGGER.debug(String.format("Kubernetes cluster ID: %s has %d ready now", kubernetesCluster.getUuid(), kubernetesCluster.getTotalNodeCount())); + return true; + } else { + LOGGER.debug(String.format("Kubernetes cluster ID: %s has total %d provisioned nodes while %d ready now", kubernetesCluster.getUuid(), kubernetesCluster.getTotalNodeCount(), nodesCount)); + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed to retrieve ready node count for Kubernetes cluster ID: %s", kubernetesCluster.getUuid()), e); + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ex) { + LOGGER.warn(String.format("Error while waiting during Kubernetes cluster ID: %s ready node check. %d/%d", kubernetesCluster.getUuid(), retryCounter+1, retries), ex); + } + retryCounter++; + } + return false; + } + + private boolean removeKubernetesClusterNode(KubernetesCluster kubernetesCluster, String ipAddress, int port, UserVm userVm, int retries, int waitDuration) { + File pkFile = getManagementServerSshPublicKeyFile(); + int retryCounter = 0; + String hostName = userVm.getHostName(); + if (!Strings.isNullOrEmpty(hostName)) { + hostName = hostName.toLowerCase(); + } + while (retryCounter < retries) { + retryCounter++; + try { + Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port, CLUSTER_NODE_VM_USER, + pkFile, null, String.format("sudo kubectl drain %s --ignore-daemonsets --delete-local-data", hostName), + 10000, 10000, 60000); + if (!result.first()) { + LOGGER.warn(String.format("Draining node: %s on VM ID: %s in Kubernetes cluster ID: %s unsuccessful", hostName, userVm.getUuid(), kubernetesCluster.getUuid())); + } else { + result = SshHelper.sshExecute(ipAddress, port, CLUSTER_NODE_VM_USER, + pkFile, null, String.format("sudo kubectl delete node %s", hostName), + 10000, 10000, 30000); + if (result.first()) { + return true; + } else { + LOGGER.warn(String.format("Deleting node: %s on VM ID: %s in Kubernetes cluster ID: %s unsuccessful", hostName, userVm.getUuid(), kubernetesCluster.getUuid())); + } + } + break; + } catch (Exception e) { + String msg = String.format("Failed to remove Kubernetes cluster ID: %s node: %s on VM ID: %s", kubernetesCluster.getUuid(), hostName, userVm.getUuid()); + LOGGER.warn(msg, e); + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ie) { + LOGGER.error(String.format("Error while waiting for Kubernetes cluster ID: %s node: %s on VM ID: %s removal", kubernetesCluster.getUuid(), hostName, userVm.getUuid()), ie); + } + retryCounter++; + } + return false; + } + + private boolean uncordonKubernetesClusterNode(KubernetesCluster kubernetesCluster, String ipAddress, int port, UserVm userVm, int retries, int waitDuration) { + int retryCounter = 0; + String hostName = userVm.getHostName(); + if (!Strings.isNullOrEmpty(hostName)) { + hostName = hostName.toLowerCase(); + } + while (retryCounter < retries) { + Pair<Boolean, String> result = null; + try { + result = SshHelper.sshExecute(ipAddress, port, CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), null, + String.format("sudo kubectl uncordon %s", hostName), + 10000, 10000, 30000); + if (result.first()) { + return true; + } + } catch (Exception e) { + LOGGER.warn(String.format("Failed to uncordon node: %s on VM ID: %s in Kubernetes cluster ID: %s", hostName, userVm.getUuid(), kubernetesCluster.getUuid()), e); + } + try { + Thread.sleep(waitDuration); + } catch (InterruptedException ie) { + LOGGER.warn(String.format("Error while waiting for uncordon Kubernetes cluster ID: %s node: %s on VM ID: %s", kubernetesCluster.getUuid(), hostName, userVm.getUuid()), ie); + } + retryCounter++; + } + return false; + } + + private Network startKubernetesCLusterNetwork(final KubernetesCluster kubernetesCluster, final DeployDestination destination, final Account account) throws ManagementServerException { + final ReservationContext context = new ReservationContextImpl(null, null, null, account); + Network network = networkDao.findById(kubernetesCluster.getNetworkId()); + if (network == null) { + String msg = String.format("Network for Kubernetes cluster ID: %s not found", kubernetesCluster.getUuid()); + LOGGER.warn(msg); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg); + } + try { + networkMgr.startNetwork(network.getId(), destination, context); + LOGGER.debug(String.format("Network ID: %s is started for the Kubernetes cluster ID: %s", network.getUuid(), kubernetesCluster.getUuid())); + } catch (ConcurrentOperationException | ResourceUnavailableException |InsufficientCapacityException e) { + String msg = String.format("Failed to start Kubernetes cluster ID: %s as unable to start associated network ID: %s" , kubernetesCluster.getUuid(), network.getUuid()); + LOGGER.error(msg, e); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg, e); + } + return network; + } + + private UserVm provisionKubernetesClusterMasterVm(final KubernetesCluster kubernetesCluster, final DeployDestination destination, final Network network, final Account account, final String publicIpAddress) throws ManagementServerException { + UserVm k8sMasterVM = null; + try { + k8sMasterVM = createKubernetesMaster(kubernetesCluster, destination.getPod(), network, account, publicIpAddress); + addKubernetesClusterVm(kubernetesCluster.getId(), k8sMasterVM.getId()); + startKubernetesVM(k8sMasterVM, kubernetesCluster); + k8sMasterVM = userVmDao.findById(k8sMasterVM.getId()); + LOGGER.debug(String.format("Provisioned the master VM ID: %s in to the Kubernetes cluster ID: %s", k8sMasterVM.getUuid(), kubernetesCluster.getUuid())); + } catch (ManagementServerException | ResourceUnavailableException | InsufficientCapacityException e) { + String msg = String.format("Provisioning the master VM failed in the Kubernetes cluster ID: %s", kubernetesCluster.getUuid()); + LOGGER.warn(msg, e); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg, e); + } + return k8sMasterVM; + } + + private List<UserVm> provisionKubernetesClusterAdditionalMasterVms(final KubernetesCluster kubernetesCluster, final String publicIpAddress) throws ManagementServerException { + List<UserVm> additionalMasters = new ArrayList<>(); + if (kubernetesCluster.getMasterNodeCount() > 1) { + for (int i = 1; i < kubernetesCluster.getMasterNodeCount(); i++) { + UserVm vm = null; + try { + vm = createKubernetesAdditionalMaster(kubernetesCluster, publicIpAddress, i); + addKubernetesClusterVm(kubernetesCluster.getId(), vm.getId()); + startKubernetesVM(vm, kubernetesCluster); + additionalMasters.add(vm); + LOGGER.debug(String.format("Provisioned additional master VM ID: %s in to the Kubernetes cluster ID: %s", vm.getUuid(), kubernetesCluster.getUuid())); + } catch (ManagementServerException | ResourceUnavailableException | InsufficientCapacityException e) { + String msg = String.format("Provisioning additional master VM %d/%d failed in the Kubernetes cluster ID: %s", i+1, kubernetesCluster.getMasterNodeCount(), kubernetesCluster.getUuid()); + LOGGER.warn(msg, e); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg, e); + } + } + } + return additionalMasters; + } + + private List<UserVm> provisionKubernetesClusterNodeVms(final KubernetesCluster kubernetesCluster, final String publicIpAddress) throws ManagementServerException { + List<UserVm> nodes = new ArrayList<>(); + for (int i = 1; i <= kubernetesCluster.getNodeCount(); i++) { + UserVm vm = null; + try { + vm = createKubernetesNode(kubernetesCluster, publicIpAddress, i); + addKubernetesClusterVm(kubernetesCluster.getId(), vm.getId()); + startKubernetesVM(vm, kubernetesCluster); + nodes.add(vm); + LOGGER.debug(String.format("Provisioned node master VM ID: %s in to the Kubernetes cluster ID: %s", vm.getUuid(), kubernetesCluster.getUuid())); + } catch (ManagementServerException | ResourceUnavailableException | InsufficientCapacityException e) { + String msg = String.format("Provisioning node VM %d/%d failed in the Kubernetes cluster ID: %s", i, kubernetesCluster.getNodeCount(), kubernetesCluster.getUuid()); + LOGGER.warn(msg, e); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg, e); + } + } + return nodes; + } + + private boolean isKubernetesClusterMasterVmRunning(final KubernetesCluster kubernetesCluster, final String ipAddress, final int port, final long timeout) { + boolean masterVmRunning = false; + long startTime = System.currentTimeMillis(); + while (!masterVmRunning && System.currentTimeMillis() - startTime < timeout) { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(ipAddress, port), 10000); + masterVmRunning = true; + } catch (IOException e) { + LOGGER.debug(String.format("Waiting for Kubernetes cluster ID: %s master node VMs to be accessible", kubernetesCluster.getUuid())); + try { + Thread.sleep(10000); + } catch (InterruptedException ex) { + LOGGER.warn(String.format("Error while waiting for Kubernetes cluster ID: %s master node VMs to be accessible", kubernetesCluster.getUuid()), ex); + } + } + } + return masterVmRunning; + } + + // Start cluster after creation (cluster will be started for first time therefore resources will be provisioned as well) + private boolean startKubernetesClusterOnCreate(final long kubernetesClusterId) throws ManagementServerException { + + // Starting a Kubernetes cluster has below workflow + // - start the network + // - provision the master / node VM + // - provision node VM's (as many as cluster size) + // - update the book keeping data of the VM's provisioned for the cluster + // - setup networking (add Firewall and PF rules) + // - wait till Kubernetes API server on master VM to come up + // - wait till addon services (dashboard etc) to come up + // - update API and dashboard URL endpoints in Kubernetes cluster details + + KubernetesClusterVO kubernetesCluster = kubernetesClusterDao.findById(kubernetesClusterId); + final DataCenter zone = dataCenterDao.findById(kubernetesCluster.getZoneId()); + if (zone == null) { + throw new CloudRuntimeException(String.format("Unable to find zone for Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + } + LOGGER.debug(String.format("Starting Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.StartRequested); + Account account = accountDao.findById(kubernetesCluster.getAccountId()); + + DeployDestination dest = null; + try { + dest = plan(kubernetesCluster, zone); + } catch (InsufficientCapacityException e) { + String msg = String.format("Provisioning the cluster failed due to insufficient capacity in the Kubernetes cluster: %s", kubernetesCluster.getUuid()); + LOGGER.error(msg, e); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg, e); + } + + Network network = startKubernetesCLusterNetwork(kubernetesCluster, dest, account); + + Pair<String, Integer> publicIpSshPort = getKubernetesClusterServerIpSshPort(kubernetesCluster); + String publicIpAddress = publicIpSshPort.first(); + if (Strings.isNullOrEmpty(publicIpAddress) && + (Network.GuestType.Isolated.equals(network.getGuestType()) || kubernetesCluster.getMasterNodeCount() > 1)) { // Shared network, single-master cluster won't have an IP yet + String msg = String.format("Failed to start Kubernetes cluster ID: %s as no public IP found for the cluster" , kubernetesCluster.getUuid()); + LOGGER.warn(msg); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg); + } + + List<Long> clusterVMIds = new ArrayList<>(); + + UserVm k8sMasterVM = provisionKubernetesClusterMasterVm(kubernetesCluster, dest, network, account, publicIpAddress); + clusterVMIds.add(k8sMasterVM.getId()); + + if (Strings.isNullOrEmpty(publicIpAddress)) { + publicIpSshPort = getKubernetesClusterServerIpSshPort(kubernetesCluster, k8sMasterVM); + publicIpAddress = publicIpSshPort.first(); + if (Strings.isNullOrEmpty(publicIpAddress)) { + String msg = String.format("Failed to start Kubernetes cluster ID: %s as no public IP found for the cluster", kubernetesCluster.getUuid()); + LOGGER.warn(msg); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + throw new ManagementServerException(msg); + } + } + + List<UserVm> additionalMasterVMs = provisionKubernetesClusterAdditionalMasterVms(kubernetesCluster, publicIpAddress); + for (UserVm vm : additionalMasterVMs){ + clusterVMIds.add(vm.getId()); + } + + List<UserVm> nodeVMs = provisionKubernetesClusterNodeVms(kubernetesCluster, publicIpAddress); + for (UserVm vm : nodeVMs){ + clusterVMIds.add(vm.getId()); + } + + LOGGER.debug(String.format("Kubernetes cluster ID: %s VMs successfully provisioned", kubernetesCluster.getUuid())); + + try { + setupKubernetesClusterNetworkRules(kubernetesCluster, network, account, clusterVMIds); + } catch (ManagementServerException e) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to setup Kubernetes cluster ID: %s, unable to setup network rules", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed, e); + } + + attachIsoKubernetesVMs(kubernetesCluster, clusterVMIds); + + if (!isKubernetesClusterMasterVmRunning(kubernetesCluster, publicIpAddress, publicIpSshPort.second(), 10 * 60 * 1000)) { + String msg = String.format("Failed to setup Kubernetes cluster ID: %s in usable state as unable to access master node VMs of the cluster", kubernetesCluster.getUuid()); + if (kubernetesCluster.getMasterNodeCount() > 1 && Network.GuestType.Shared.equals(network.getGuestType())) { + msg = String.format("%s. Make sure external load-balancer has port forwarding rules for SSH access on ports %d-%d and API access on port %d", + msg, + CLUSTER_NODES_DEFAULT_START_SSH_PORT, + CLUSTER_NODES_DEFAULT_START_SSH_PORT + kubernetesCluster.getTotalNodeCount() - 1, + CLUSTER_API_PORT); + } + logTransitStateDetachIsoAndThrow(Level.ERROR, msg, kubernetesCluster, clusterVMIds, KubernetesCluster.Event.CreateFailed, null); + } + + boolean k8sApiServerSetup = isKubernetesClusterServerRunning(kubernetesCluster, publicIpAddress, 20, 30000); + if (!k8sApiServerSetup) { + logTransitStateDetachIsoAndThrow(Level.ERROR, String.format("Failed to setup Kubernetes cluster ID: %s in usable state as unable to provision API endpoint for the cluster", kubernetesCluster.getUuid()), kubernetesCluster, clusterVMIds, KubernetesCluster.Event.CreateFailed, null); + } + kubernetesCluster = kubernetesClusterDao.findById(kubernetesClusterId); + kubernetesCluster.setEndpoint(String.format("https://%s:%d/", publicIpAddress, CLUSTER_API_PORT)); + kubernetesClusterDao.update(kubernetesCluster.getId(), kubernetesCluster); + + int sshPort = publicIpSshPort.second(); + boolean readyNodesCountValid = validateKubernetesClusterReadyNodesCount(kubernetesCluster, publicIpAddress, sshPort, 30, 30000); + + // Detach binaries ISO from new VMs + detachIsoKubernetesVMs(kubernetesCluster, clusterVMIds); + + // Throw exception if nodes count for k8s cluster timed out + if (!readyNodesCountValid) { + logTransitStateAndThrow(Level.WARN, String.format("Failed to setup Kubernetes cluster ID: %s as it does not have desired number of nodes in ready state", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.CreateFailed); + } + + boolean k8sKubeConfigCopied = false; + String kubeConfig = getKubernetesClusterConfig(kubernetesCluster, publicIpAddress, sshPort, 5); + if (!Strings.isNullOrEmpty(kubeConfig)) { + k8sKubeConfigCopied = true; + } + if (!k8sKubeConfigCopied) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to setup Kubernetes cluster ID: %s in usable state as unable to retrieve kube-config for the cluster", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + kubeConfig = kubeConfig.replace(String.format("server: https://%s:%d", k8sMasterVM.getPrivateIpAddress(), CLUSTER_API_PORT), + String.format("server: https://%s:%d", publicIpAddress, CLUSTER_API_PORT)); + kubernetesClusterDetailsDao.addDetail(kubernetesCluster.getId(), "kubeConfigData", Base64.encodeBase64String(kubeConfig.getBytes(StringUtils.getPreferredCharset())), false); + + boolean dashboardServiceRunning = isKubernetesClusterDashboardServiceRunning(kubernetesCluster, publicIpAddress, sshPort, 10, 20000); + if (!dashboardServiceRunning) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to setup Kubernetes cluster ID: %s in usable state as unable to get Dashboard service running for the cluster", kubernetesCluster.getUuid()), kubernetesCluster.getId(),KubernetesCluster.Event.OperationFailed); + } + kubernetesClusterDetailsDao.addDetail(kubernetesCluster.getId(), "dashboardServiceRunning", String.valueOf(dashboardServiceRunning), false); + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); + return true; + } + + private boolean startStoppedKubernetesCluster(long kubernetesClusterId) throws ManagementServerException { + final KubernetesClusterVO kubernetesCluster = kubernetesClusterDao.findById(kubernetesClusterId); + if (kubernetesCluster == null) { + throw new ManagementServerException("Invalid Kubernetes cluster ID"); + } + if (kubernetesCluster.getRemoved() != null) { + throw new ManagementServerException(String.format("Kubernetes cluster ID: %s is already deleted", kubernetesCluster.getUuid())); + } + if (kubernetesCluster.getState().equals(KubernetesCluster.State.Running)) { + LOGGER.debug(String.format("Kubernetes cluster ID: %s is in running state", kubernetesCluster.getUuid())); + return true; + } + if (kubernetesCluster.getState().equals(KubernetesCluster.State.Starting)) { + LOGGER.debug(String.format("Kubernetes cluster ID: %s is already in starting state", kubernetesCluster.getUuid())); + return true; + } + LOGGER.debug(String.format("Starting Kubernetes cluster ID: %s", kubernetesCluster.getUuid())); + + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.StartRequested); + + for (final KubernetesClusterVmMapVO vmMapVO : kubernetesClusterVmMapDao.listByClusterId(kubernetesClusterId)) { + final UserVmVO vm = userVmDao.findById(vmMapVO.getVmId()); + try { + if (vm == null) { + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + throw new ManagementServerException("Failed to start all VMs in Kubernetes cluster ID: " + kubernetesClusterId); + } + startKubernetesVM(vm, kubernetesCluster); + } catch (CloudRuntimeException ex) { + LOGGER.warn(String.format("Failed to start VM in Kubernetes cluster ID: %s due to ", kubernetesCluster.getUuid()) + ex); + // dont bail out here. proceed further to stop the reset of the VM's + } + } + + for (final KubernetesClusterVmMapVO vmMapVO : kubernetesClusterVmMapDao.listByClusterId(kubernetesClusterId)) { + final UserVmVO vm = userVmDao.findById(vmMapVO.getVmId()); + if (vm == null || !vm.getState().equals(VirtualMachine.State.Running)) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to start all VMs in Kubernetes cluster ID: %s", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + } + + InetAddress address = null; + try { + address = InetAddress.getByName(new URL(kubernetesCluster.getEndpoint()).getHost()); + } catch (MalformedURLException | UnknownHostException ex) { + logTransitStateAndThrow(Level.ERROR, String.format("Kubernetes cluster ID: %s has invalid API endpoint. Can not verify if cluster is in ready state", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + + Pair<String, Integer> publicIpSshPort = getKubernetesClusterServerIpSshPort(kubernetesCluster); + String publicIpAddress = publicIpSshPort.first(); + if (Strings.isNullOrEmpty(publicIpAddress)) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to start Kubernetes cluster ID: %s as no public IP found for the cluster" , kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + + boolean k8sApiServerSetup = isKubernetesClusterServerRunning(kubernetesCluster, publicIpAddress, 10, 30000); + if (!k8sApiServerSetup) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to start Kubernetes cluster ID: %s in usable state", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + + int sshPort = publicIpSshPort.second(); + KubernetesClusterDetailsVO kubeConfigDetail = kubernetesClusterDetailsDao.findDetail(kubernetesCluster.getId(), "kubeConfigData"); + if (kubeConfigDetail == null || Strings.isNullOrEmpty(kubeConfigDetail.getValue())) { + boolean k8sKubeConfigCopied = false; + String kubeConfig = getKubernetesClusterConfig(kubernetesCluster, publicIpAddress, sshPort, 5); + if (!Strings.isNullOrEmpty(kubeConfig)) { + k8sKubeConfigCopied = true; + } + if (!k8sKubeConfigCopied) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to start Kubernetes cluster ID: %s in usable state as unable to retrieve kube-config for the cluster", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + kubernetesClusterDetailsDao.addDetail(kubernetesCluster.getId(), "kubeConfigData", Base64.encodeBase64String(kubeConfig.getBytes(StringUtils.getPreferredCharset())), false); + } + KubernetesClusterDetailsVO dashboardServiceRunningDetail = kubernetesClusterDetailsDao.findDetail(kubernetesCluster.getId(), "dashboardServiceRunning"); + if (kubeConfigDetail == null || !Boolean.parseBoolean(dashboardServiceRunningDetail.getValue())) { + boolean dashboardServiceRunning = isKubernetesClusterDashboardServiceRunning(kubernetesCluster, publicIpAddress, sshPort, 10, 20000); + if (!dashboardServiceRunning) { + logTransitStateAndThrow(Level.ERROR, String.format("Failed to start Kubernetes cluster ID: %s in usable state as unable to get Dashboard service running for the cluster", kubernetesCluster.getUuid()), kubernetesCluster.getId(), KubernetesCluster.Event.OperationFailed); + } + kubernetesClusterDetailsDao.addDetail(kubernetesCluster.getId(), "dashboardServiceRunning", String.valueOf(dashboardServiceRunning), false); + } + + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); + LOGGER.debug(String.format("Kubernetes cluster ID: %s successfully started", kubernetesCluster.getUuid())); + return true; + } + + private IpAddress getSourceNatIp(Network network) { + List<? extends IpAddress> addresses = networkModel.listPublicIpsAssignedToGuestNtwk(network.getId(), true); + if (CollectionUtils.isEmpty(addresses)) { + return null; + } + for (IpAddress address : addresses) { + if (address.isSourceNat()) { + return address; + } + } + return null; + } + + private FirewallRule removeSshFirewallRule(IpAddress publicIp) { + FirewallRule rule = null; + List<FirewallRuleVO> firewallRules = firewallRulesDao.listByIpAndPurposeAndNotRevoked(publicIp.getId(), FirewallRule.Purpose.Firewall); + for (FirewallRuleVO firewallRule : firewallRules) { + if (firewallRule.getSourcePortStart() == CLUSTER_NODES_DEFAULT_START_SSH_PORT) { + rule = firewallRule; + firewallService.revokeIngressFwRule(firewallRule.getId(), true); + break; + } + } + return rule; + } + + private void provisionFirewallRules(final IpAddress publicIp, final Account account, int startPort, int endPort) throws NoSuchFieldException, + IllegalAccessException, ResourceUnavailableException, NetworkRuleConflictException { + List<String> sourceCidrList = new ArrayList<String>(); + sourceCidrList.add("0.0.0.0/0"); + + CreateFirewallRuleCmd rule = new CreateFirewallRuleCmd(); + rule = ComponentContext.inject(rule); + + Field addressField = rule.getClass().getDeclaredField("ipAddressId"); + addressField.setAccessible(true); + addressField.set(rule, publicIp.getId()); + + Field protocolField = rule.getClass().getDeclaredField("protocol"); + protocolField.setAccessible(true); + protocolField.set(rule, "TCP"); + + Field startPortField = rule.getClass().getDeclaredField("publicStartPort"); + startPortField.setAccessible(true); + startPortField.set(rule, startPort); + + Field endPortField = rule.getClass().getDeclaredField("publicEndPort"); + endPortField.setAccessible(true); + endPortField.set(rule, endPort); + + Field cidrField = rule.getClass().getDeclaredField("cidrlist"); + cidrField.setAccessible(true); + cidrField.set(rule, sourceCidrList); + + firewallService.createIngressFirewallRule(rule); + firewallService.applyIngressFwRules(publicIp.getId(), account); + } + + private void removePortForwardingRules(IpAddress publicIp, Network network, Account account, List<Long> removedVMIds) throws ResourceUnavailableException { + if (!CollectionUtils.isEmpty(removedVMIds)) { + for (Long vmId : removedVMIds) { + List<PortForwardingRuleVO> pfRules = portForwardingRulesDao.listByNetwork(network.getId()); + for (PortForwardingRuleVO pfRule : pfRules) { + if (pfRule.getVirtualMachineId() == vmId) { + portForwardingRulesDao.remove(pfRule.getId()); + break; + } + } + } + rulesService.applyPortForwardingRules(publicIp.getId(), account); + } + } + + private void provisionSshPortForwardingRules(KubernetesCluster kubernetesCluster, IpAddress publicIp, Network network, Account account, List<Long> clusterVMIds, int firewallRuleSourcePortStart) throws ResourceUnavailableException, + NetworkRuleConflictException { + if (!CollectionUtils.isEmpty(clusterVMIds)) { // Upscaling, add new port-forwarding rules + // Apply port forwarding only to new VMs + final long publicIpId = publicIp.getId(); + final long networkId = network.getId(); + final long accountId = account.getId(); + final long domainId = account.getDomainId(); + for (int i = 0; i < clusterVMIds.size(); ++i) { + long vmId = clusterVMIds.get(i); + Nic vmNic = networkModel.getNicInNetwork(vmId, networkId); + final Ip vmIp = new Ip(vmNic.getIPv4Address()); + final long vmIdFinal = vmId; + final int srcPortFinal = firewallRuleSourcePortStart + i; + + PortForwardingRuleVO pfRule = Transaction.execute(new TransactionCallbackWithException<PortForwardingRuleVO, NetworkRuleConflictException>() { + @Override + public PortForwardingRuleVO doInTransaction(TransactionStatus status) throws NetworkRuleConflictException { + PortForwardingRuleVO newRule = + new PortForwardingRuleVO(null, publicIpId, + srcPortFinal, srcPortFinal, + vmIp, + 22, 22, + "tcp", networkId, accountId, domainId, vmIdFinal); + newRule.setDisplay(true); + newRule.setState(FirewallRule.State.Add); + newRule = portForwardingRulesDao.persist(ne Review comment: isn't this better added as javadoc? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
