shwstppr commented on a change in pull request #3680: [WIP: DO NOT MERGE] 
CloudStack Kubernetes Service
URL: https://github.com/apache/cloudstack/pull/3680#discussion_r366210751
 
 

 ##########
 File path: 
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetescluster/KubernetesClusterManagerImpl.java
 ##########
 @@ -0,0 +1,3061 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.kubernetescluster;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Socket;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.acl.ControlledEntity;
+import org.apache.cloudstack.acl.SecurityChecker;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.ApiErrorCode;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.CreateKubernetesClusterCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.DeleteKubernetesClusterCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.GetKubernetesClusterConfigCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.ListKubernetesClustersCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.ScaleKubernetesClusterCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.StartKubernetesClusterCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.StopKubernetesClusterCmd;
+import 
org.apache.cloudstack.api.command.user.kubernetescluster.UpgradeKubernetesClusterCmd;
+import org.apache.cloudstack.api.command.user.vm.StartVMCmd;
+import org.apache.cloudstack.api.response.KubernetesClusterConfigResponse;
+import org.apache.cloudstack.api.response.KubernetesClusterResponse;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.ca.CAManager;
+import org.apache.cloudstack.context.CallContext;
+import 
org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.framework.ca.Certificate;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.managed.context.ManagedContextRunnable;
+import org.apache.cloudstack.utils.security.CertUtils;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.cloud.api.ApiDBUtils;
+import com.cloud.api.query.dao.TemplateJoinDao;
+import com.cloud.api.query.vo.TemplateJoinVO;
+import com.cloud.capacity.CapacityManager;
+import com.cloud.dc.ClusterDetailsDao;
+import com.cloud.dc.ClusterDetailsVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.DataCenter;
+import com.cloud.dc.DataCenterVO;
+import com.cloud.dc.Pod;
+import com.cloud.dc.Vlan;
+import com.cloud.dc.VlanVO;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.dc.dao.DataCenterDao;
+import com.cloud.dc.dao.VlanDao;
+import com.cloud.deploy.DeployDestination;
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.InsufficientServerCapacityException;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.ManagementServerException;
+import com.cloud.exception.NetworkRuleConflictException;
+import com.cloud.exception.PermissionDeniedException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.host.Host.Type;
+import com.cloud.host.HostVO;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.kubernetescluster.dao.KubernetesClusterDao;
+import com.cloud.kubernetescluster.dao.KubernetesClusterDetailsDao;
+import com.cloud.kubernetescluster.dao.KubernetesClusterVmMapDao;
+import com.cloud.kubernetesversion.KubernetesSupportedVersion;
+import com.cloud.kubernetesversion.KubernetesSupportedVersionVO;
+import com.cloud.kubernetesversion.KubernetesVersionManagerImpl;
+import com.cloud.kubernetesversion.dao.KubernetesSupportedVersionDao;
+import com.cloud.network.IpAddress;
+import com.cloud.network.IpAddressManager;
+import com.cloud.network.Network;
+import com.cloud.network.Network.Service;
+import com.cloud.network.NetworkModel;
+import com.cloud.network.NetworkService;
+import com.cloud.network.PhysicalNetwork;
+import com.cloud.network.addr.PublicIp;
+import com.cloud.network.dao.FirewallRulesDao;
+import com.cloud.network.dao.IPAddressDao;
+import com.cloud.network.dao.IPAddressVO;
+import com.cloud.network.dao.NetworkDao;
+import com.cloud.network.dao.NetworkVO;
+import com.cloud.network.dao.PhysicalNetworkDao;
+import com.cloud.network.firewall.FirewallService;
+import com.cloud.network.lb.LoadBalancingRulesService;
+import com.cloud.network.rules.FirewallRule;
+import com.cloud.network.rules.FirewallRuleVO;
+import com.cloud.network.rules.LoadBalancer;
+import com.cloud.network.rules.PortForwardingRuleVO;
+import com.cloud.network.rules.RulesService;
+import com.cloud.network.rules.dao.PortForwardingRulesDao;
+import com.cloud.offering.NetworkOffering;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.offerings.NetworkOfferingVO;
+import com.cloud.offerings.dao.NetworkOfferingDao;
+import com.cloud.offerings.dao.NetworkOfferingServiceMapDao;
+import com.cloud.org.Grouping;
+import com.cloud.resource.ResourceManager;
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.service.dao.ServiceOfferingDao;
+import com.cloud.storage.Storage;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.VMTemplateZoneVO;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.storage.dao.VMTemplateZoneDao;
+import com.cloud.template.TemplateApiService;
+import com.cloud.template.VirtualMachineTemplate;
+import com.cloud.user.Account;
+import com.cloud.user.AccountManager;
+import com.cloud.user.AccountService;
+import com.cloud.user.SSHKeyPairVO;
+import com.cloud.user.User;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.user.dao.SSHKeyPairDao;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ComponentContext;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionCallbackWithException;
+import com.cloud.utils.db.TransactionStatus;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.NoTransitionException;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.net.Ip;
+import com.cloud.utils.net.NetUtils;
+import com.cloud.utils.ssh.SshHelper;
+import com.cloud.vm.Nic;
+import com.cloud.vm.ReservationContext;
+import com.cloud.vm.ReservationContextImpl;
+import com.cloud.vm.UserVmManager;
+import com.cloud.vm.UserVmService;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.VMInstanceDao;
+import com.google.common.base.Strings;
+
+public class KubernetesClusterManagerImpl extends ManagerBase implements 
KubernetesClusterService {
+
+    private static final Logger LOGGER = 
Logger.getLogger(KubernetesClusterManagerImpl.class);
+
+    protected StateMachine2<KubernetesCluster.State, KubernetesCluster.Event, 
KubernetesCluster> _stateMachine = KubernetesCluster.State.getStateMachine();
+
+    ScheduledExecutorService _gcExecutor;
+    ScheduledExecutorService _stateScanner;
+
+    @Inject
+    protected KubernetesClusterDao kubernetesClusterDao;
+    @Inject
+    protected KubernetesClusterVmMapDao kubernetesClusterVmMapDao;
+    @Inject
+    protected KubernetesClusterDetailsDao kubernetesClusterDetailsDao;
+    @Inject
+    protected KubernetesSupportedVersionDao kubernetesSupportedVersionDao;
+    @Inject
+    protected CAManager caManager;
+    @Inject
+    protected SSHKeyPairDao sshKeyPairDao;
+    @Inject
+    protected DataCenterDao dataCenterDao;
+    @Inject
+    protected ClusterDao clusterDao;
+    @Inject
+    protected ClusterDetailsDao clusterDetailsDao;
+    @Inject
+    protected ServiceOfferingDao serviceOfferingDao;
+    @Inject
+    protected VMTemplateDao templateDao;
+    @Inject
+    protected TemplateApiService templateService;
+    @Inject
+    protected VMTemplateZoneDao templateZoneDao;
+    @Inject
+    protected TemplateJoinDao templateJoinDao;
+    @Inject
+    protected AccountService accountService;
+    @Inject
+    protected AccountDao accountDao;
+    @Inject
+    protected AccountManager accountManager;
+    @Inject
+    protected VMInstanceDao vmInstanceDao;
+    @Inject
+    protected UserVmDao userVmDao;
+    @Inject
+    protected UserVmService userVmService;
+    @Inject
+    protected UserVmManager userVmManager;
+    @Inject
+    protected ConfigurationDao globalConfigDao;
+    @Inject
+    protected NetworkOfferingDao networkOfferingDao;
+    @Inject
+    protected NetworkService networkService;
+    @Inject
+    protected NetworkModel networkModel;
+    @Inject
+    protected PhysicalNetworkDao physicalNetworkDao;
+    @Inject
+    protected NetworkOrchestrationService networkMgr;
+    @Inject
+    protected NetworkDao networkDao;
+    @Inject
+    protected IPAddressDao ipAddressDao;
+    @Inject
+    protected PortForwardingRulesDao portForwardingRulesDao;
+    @Inject
+    protected FirewallService firewallService;
+    @Inject
+    protected RulesService rulesService;
+    @Inject
+    protected NetworkOfferingServiceMapDao networkOfferingServiceMapDao;
+    @Inject
+    protected CapacityManager capacityManager;
+    @Inject
+    protected ResourceManager resourceManager;
+    @Inject
+    protected FirewallRulesDao firewallRulesDao;
+    @Inject
+    protected IpAddressManager ipAddressManager;
+    @Inject
+    protected LoadBalancingRulesService lbService;
+    @Inject
+    protected VlanDao vlanDao;
+
+    private static final String CLUSTER_NODE_VM_USER = "core";
+    private static final int CLUSTER_API_PORT = 6443;
+    private static final int CLUSTER_NODES_DEFAULT_START_SSH_PORT = 2222;
+
+    private static String getStackTrace(final Throwable throwable) {
+        final StringWriter sw = new StringWriter();
+        final PrintWriter pw = new PrintWriter(sw, true);
+        throwable.printStackTrace(pw);
+        return sw.getBuffer().toString();
+    }
+
+    private String readResourceFile(String resource) throws IOException {
+        return 
IOUtils.toString(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResourceAsStream(resource)),
 Charset.defaultCharset().name());
+    }
+
+    private boolean isKubernetesServiceConfigured(DataCenter zone) {
+        // Check Kubernetes VM template for zone
+        String templateName = 
globalConfigDao.getValue(KubernetesServiceConfig.KubernetesClusterTemplateName.key());
+        if (templateName == null || templateName.isEmpty()) {
+            LOGGER.warn(String.format("Global setting %s is empty. Template 
name need to be specified for Kubernetes service to function", 
KubernetesServiceConfig.KubernetesClusterTemplateName.key()));
+            return false;
+        }
+        final VMTemplateVO template = 
templateDao.findByTemplateName(templateName);
+        if (template == null) {
+            LOGGER.warn(String.format("Unable to find the template %s to be 
used for provisioning Kubernetes cluster", templateName));
+            return false;
+        }
+        // Check network offering
+        String networkOfferingName = 
globalConfigDao.getValue(KubernetesServiceConfig.KubernetesClusterNetworkOffering.key());
+        if (networkOfferingName == null || networkOfferingName.isEmpty()) {
+            LOGGER.warn(String.format("Global setting %s is empty. Admin has 
not yet specified the network offering to be used for provisioning isolated 
network for the cluster", 
KubernetesServiceConfig.KubernetesClusterNetworkOffering.key()));
+            return false;
+        }
+        NetworkOfferingVO networkOffering = 
networkOfferingDao.findByUniqueName(networkOfferingName);
+        if (networkOffering == null) {
+            LOGGER.warn(String.format("Unable to find the network offering %s 
to be used for provisioning Kubernetes cluster", networkOfferingName));
+            return false;
+        }
+        if (networkOffering.getState() == NetworkOffering.State.Disabled) {
+            LOGGER.warn(String.format("Network offering ID: %s is not 
enabled", networkOffering.getUuid()));
+            return false;
+        }
+        List<String> services = 
networkOfferingServiceMapDao.listServicesForNetworkOffering(networkOffering.getId());
+        if (services == null || services.isEmpty() || 
!services.contains("SourceNat")) {
+            LOGGER.warn(String.format("Network offering ID: %s does not have 
necessary services to provision Kubernetes cluster", 
networkOffering.getUuid()));
+            return false;
+        }
+        if (!networkOffering.isEgressDefaultPolicy()) {
+            LOGGER.warn(String.format("Network offering ID: %s has egress 
default policy turned off should be on to provision Kubernetes cluster", 
networkOffering.getUuid()));
+            return false;
+        }
+        long physicalNetworkId = 
networkModel.findPhysicalNetworkId(zone.getId(), networkOffering.getTags(), 
networkOffering.getTrafficType());
+        PhysicalNetwork physicalNetwork = 
physicalNetworkDao.findById(physicalNetworkId);
+        if (physicalNetwork == null) {
+            LOGGER.warn(String.format("Unable to find physical network with 
tag: %s", networkOffering.getTags()));
+            return false;
+        }
+        return true;
+    }
+
+    private File getManagementServerSshPublicKeyFile() {
+        boolean devel = 
Boolean.parseBoolean(globalConfigDao.getValue("developer"));
+        String keyFile = String.format("%s/.ssh/id_rsa", 
System.getProperty("user.home"));
+        if (devel) {
+            keyFile += ".cloud";
+        }
+        return new File(keyFile);
+    }
+
+    private String generateClusterToken(KubernetesCluster kubernetesCluster) {
+        if (kubernetesCluster == null) return "";
+        String token = kubernetesCluster.getUuid();
+        token = token.replaceAll("-", "");
+        token = token.substring(0, 22);
+        token = token.substring(0, 6) + "." + token.substring(6);
+        return token;
+    }
+
+    private String generateClusterHACertificateKey(KubernetesCluster 
kubernetesCluster) {
+        if (kubernetesCluster == null) return "";
+        String uuid = kubernetesCluster.getUuid();
+        StringBuilder token = new StringBuilder(uuid.replaceAll("-", ""));
+        while (token.length() < 64) {
+            token.append(token);
+        }
+        return token.toString().substring(0, 64);
+    }
+
+    private KubernetesClusterVmMapVO addKubernetesClusterVm(final long 
kubernetesClusterId, final long vmId) {
+        return Transaction.execute(new 
TransactionCallback<KubernetesClusterVmMapVO>() {
+            @Override
+            public KubernetesClusterVmMapVO doInTransaction(TransactionStatus 
status) {
+                KubernetesClusterVmMapVO newClusterVmMap = new 
KubernetesClusterVmMapVO(kubernetesClusterId, vmId);
+                kubernetesClusterVmMapDao.persist(newClusterVmMap);
+                return newClusterVmMap;
+            }
+        });
+    }
+
+    private boolean isKubernetesClusterServerRunning(KubernetesCluster 
kubernetesCluster, String ipAddress, int retries, long waitDuration) {
+        int retryCounter = 0;
+        boolean k8sApiServerSetup = false;
+        while (retryCounter < retries) {
+            try {
+                String versionOutput = IOUtils.toString(new 
URL(String.format("https://%s:%d/version";, ipAddress, CLUSTER_API_PORT)), 
StandardCharsets.UTF_8);
+                if (!Strings.isNullOrEmpty(versionOutput)) {
+                    LOGGER.debug(String.format("Kubernetes cluster ID: %s API 
has been successfully provisioned, %s", kubernetesCluster.getUuid(), 
versionOutput));
+                    k8sApiServerSetup = true;
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.warn(String.format("API endpoint for Kubernetes cluster 
ID: %s not available. Attempt: %d/%d", kubernetesCluster.getUuid(), 
retryCounter+1, retries), e);
+            }
+            try {
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException ie) {
+                LOGGER.error(String.format("Error while waiting for Kubernetes 
cluster ID: %s API endpoint to be available", kubernetesCluster.getUuid()), ie);
+            }
+            retryCounter++;
+        }
+        return k8sApiServerSetup;
+    }
+
+    private String getKubernetesClusterConfig(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, int retries) {
+        int retryCounter = 0;
+        String kubeConfig = "";
+        while (retryCounter < retries) {
+            try {
+                Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, 
port, CLUSTER_NODE_VM_USER,
+                        getManagementServerSshPublicKeyFile(), null, "sudo cat 
/etc/kubernetes/admin.conf",
+                        10000, 10000, 10000);
+
+                if (result.first() && !Strings.isNullOrEmpty(result.second())) 
{
+                    kubeConfig = result.second();
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.warn(String.format("Failed to retrieve kube-config file 
for Kubernetes cluster ID: %s. Attempt: %d/%d", kubernetesCluster.getUuid(), 
retryCounter+1, retries), e);
+            }
+            retryCounter++;
+        }
+        return kubeConfig;
+    }
+
+    private boolean isKubernetesClusterAddOnServiceRunning(KubernetesCluster 
kubernetesCluster, final String ipAddress, final int port, final String 
namespace, String serviceName) {
+        try {
+            String cmd = "sudo kubectl get pods --all-namespaces";
+            if (!Strings.isNullOrEmpty(namespace)) {
+                cmd = String.format("sudo kubectl get pods --namespace=%s", 
namespace);
+            }
+            Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, 
port, CLUSTER_NODE_VM_USER,
+                    getManagementServerSshPublicKeyFile(), null, cmd,
+                    10000, 10000, 10000);
+            if (result.first() && !Strings.isNullOrEmpty(result.second())) {
+                String[] lines = result.second().split("\n");
+                for (String line :
+                        lines) {
+                    if (line.contains(serviceName) && 
line.contains("Running")) {
+                        LOGGER.debug(String.format("Service : %s in namespace: 
%s for the Kubernetes cluster ID: %s is running",serviceName, namespace, 
kubernetesCluster.getUuid()));
+                        return true;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.warn(String.format("Unable to retrieve service: %s running 
status in namespace %s for Kubernetes cluster ID: %s", serviceName, namespace, 
kubernetesCluster.getUuid()), e);
+        }
+        return false;
+    }
+
+    private boolean 
isKubernetesClusterDashboardServiceRunning(KubernetesCluster kubernetesCluster, 
String ipAddress, int port, int retries, long waitDuration) {
+        boolean running = false;
+        int retryCounter = 0;
+        // Check if dashboard service is up running.
+        while (retryCounter < retries) {
+            LOGGER.debug(String.format("Checking dashboard service for the 
Kubernetes cluster ID: %s to come up. Attempt: %d/%d", 
kubernetesCluster.getUuid(), retryCounter+1, retries));
+            if (isKubernetesClusterAddOnServiceRunning(kubernetesCluster, 
ipAddress, port, "kubernetes-dashboard", "kubernetes-dashboard")) {
+                running = true;
+                break;
+            }
+            try {
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException ex) {
+                LOGGER.error(String.format("Error while waiting for Kubernetes 
cluster: %s API dashboard service to be available", 
kubernetesCluster.getUuid()), ex);
+            }
+            retryCounter++;
+        }
+        return running;
+    }
+
+    private Pair<String, Integer> 
getKubernetesClusterServerIpSshPort(KubernetesCluster kubernetesCluster, UserVm 
masterVm) {
+        int port = CLUSTER_NODES_DEFAULT_START_SSH_PORT;
+        KubernetesClusterDetailsVO detail = 
kubernetesClusterDetailsDao.findDetail(kubernetesCluster.getId(), 
ApiConstants.EXTERNAL_LOAD_BALANCER_IP_ADDRESS);
+        if (detail != null && !Strings.isNullOrEmpty(detail.getValue())) {
+            return new Pair<>(detail.getValue(), port);
+        }
+        Network network = 
networkDao.findById(kubernetesCluster.getNetworkId());
+        if (network == null) {
+            LOGGER.warn(String.format("Network for Kubernetes cluster ID: %s 
cannot be found", kubernetesCluster.getUuid()));
+            return new Pair<>(null, port);
+        }
+        if (Network.GuestType.Isolated.equals(network.getGuestType())) {
+            List<? extends IpAddress> addresses = 
networkModel.listPublicIpsAssignedToGuestNtwk(network.getId(), true);
+            if (CollectionUtils.isEmpty(addresses)) {
+                LOGGER.warn(String.format("No public IP addresses found for 
network ID: %s, Kubernetes cluster ID: %s", network.getUuid(), 
kubernetesCluster.getUuid()));
+                return new Pair<>(null, port);
+            }
+            for (IpAddress address : addresses) {
+                if (address.isSourceNat()) {
+                    return new Pair<>(address.getAddress().addr(), port);
+                }
+            }
+            LOGGER.warn(String.format("No source NAT IP addresses found for 
network ID: %s, Kubernetes cluster ID: %s", network.getUuid(), 
kubernetesCluster.getUuid()));
+            return new Pair<>(null, port);
+        } else if (Network.GuestType.Shared.equals(network.getGuestType())) {
+            port = 22;
+            if (masterVm == null) {
+                List<KubernetesClusterVmMapVO> clusterVMs = 
kubernetesClusterVmMapDao.listByClusterId(kubernetesCluster.getId());
+                if (CollectionUtils.isEmpty(clusterVMs)) {
+                    LOGGER.warn(String.format("Unable to retrieve VMs for 
Kubernetes cluster ID: %s", kubernetesCluster.getUuid()));
+                    return new Pair<>(null, port);
+                }
+                List<Long> vmIds = new ArrayList<>();
+                for (KubernetesClusterVmMapVO vmMap : clusterVMs) {
+                    vmIds.add(vmMap.getVmId());
+                }
+                Collections.sort(vmIds);
+                masterVm = userVmDao.findById(vmIds.get(0));
+            }
+            if (masterVm == null) {
+                LOGGER.warn(String.format("Unable to retrieve master VM for 
Kubernetes cluster ID: %s", kubernetesCluster.getUuid()));
+                return new Pair<>(null, port);
+            }
+            return new Pair<>(masterVm.getPrivateIpAddress(), port);
+        }
+        LOGGER.warn(String.format("Unable to retrieve server IP address for 
Kubernetes cluster ID: %s", kubernetesCluster.getUuid()));
+        return  new Pair<>(null, port);
+    }
+
+    private Pair<String, Integer> 
getKubernetesClusterServerIpSshPort(KubernetesCluster kubernetesCluster) {
+        return getKubernetesClusterServerIpSshPort(kubernetesCluster, null);
+    }
+
+    private int getKubernetesClusterReadyNodesCount(KubernetesCluster 
kubernetesCluster, String ipAddress, int port) throws Exception {
+        Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port,
+                CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), 
null,
+                "sudo kubectl get nodes | awk '{if ($2 == \"Ready\") print 
$1}' | wc -l",
+                10000, 10000, 20000);
+        if (result.first()) {
+            return Integer.parseInt(result.second().trim().replace("\"", ""));
+        }
+        return 0;
+    }
+
+    private boolean isKubernetesClusterNodeReady(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, String nodeName) throws 
Exception {
+        Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, port,
+                CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), 
null,
+                String.format("sudo kubectl get nodes | awk '{if ($1 == \"%s\" 
&& $2 == \"Ready\") print $1}'", nodeName),
+                10000, 10000, 20000);
+        return result.first() && nodeName.equals(result.second().trim());
+    }
+
+    private boolean isKubernetesClusterNodeReady(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, String nodeName, int retries, 
int waitDuration) {
+        int retryCounter = 0;
+        while (retryCounter < retries) {
+            boolean ready = false;
+            try {
+                ready = isKubernetesClusterNodeReady(kubernetesCluster, 
ipAddress, port, nodeName);
+            } catch (Exception e) {
+                LOGGER.warn(String.format("Failed to retrieve state of node: 
%s in Kubernetes cluster ID: %s", nodeName, kubernetesCluster.getUuid()), e);
+            }
+            if (ready) {
+                return true;
+            }
+            try {
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException ie) {
+                LOGGER.error(String.format("Error while waiting for Kubernetes 
cluster ID: %s node: %s to become ready", kubernetesCluster.getUuid(), 
nodeName), ie);
+            }
+            retryCounter++;
+        }
+        return false;
+    }
+
+    private int getKubernetesClusterReadyNodesCount(KubernetesCluster 
kubernetesCluster) throws Exception {
+        Pair<String, Integer> ipSshPort = 
getKubernetesClusterServerIpSshPort(kubernetesCluster);
+        String ipAddress = ipSshPort.first();
+        int sshPort = ipSshPort.second();
+        if (Strings.isNullOrEmpty(ipAddress)) {
+            String msg = String.format("No public IP found for Kubernetes 
cluster ID: %s" , kubernetesCluster.getUuid());
+            LOGGER.warn(msg);
+            throw new ManagementServerException(msg);
+        }
+        return getKubernetesClusterReadyNodesCount(kubernetesCluster, 
ipAddress, sshPort);
+    }
+
+    private boolean validateKubernetesClusterReadyNodesCount(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, int retries, long waitDuration) {
+        int retryCounter = 0;
+        while (retryCounter < retries) {
+            // "sudo kubectl get nodes -o json | jq \".items[].metadata.name\" 
| wc -l"
+            LOGGER.debug(String.format("Checking ready nodes for the 
Kubernetes cluster ID: %s with total %d provisioned nodes. Attempt: %d/%d", 
kubernetesCluster.getUuid(), kubernetesCluster.getTotalNodeCount(), 
retryCounter+1, retries));
+            try {
+                int nodesCount = 
getKubernetesClusterReadyNodesCount(kubernetesCluster, ipAddress, port);
+                if (nodesCount == kubernetesCluster.getTotalNodeCount()) {
+                    LOGGER.debug(String.format("Kubernetes cluster ID: %s has 
%d ready now", kubernetesCluster.getUuid(), 
kubernetesCluster.getTotalNodeCount()));
+                    return true;
+                } else {
+                    LOGGER.debug(String.format("Kubernetes cluster ID: %s has 
total %d provisioned nodes while %d ready now", kubernetesCluster.getUuid(), 
kubernetesCluster.getTotalNodeCount(), nodesCount));
+                }
+            } catch (Exception e) {
+                LOGGER.warn(String.format("Failed to retrieve ready node count 
for Kubernetes cluster ID: %s", kubernetesCluster.getUuid()), e);
+            }
+            try {
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException ex) {
+                LOGGER.warn(String.format("Error while waiting during 
Kubernetes cluster ID: %s ready node check. %d/%d", 
kubernetesCluster.getUuid(), retryCounter+1, retries), ex);
+            }
+            retryCounter++;
+        }
+        return false;
+    }
+
+    private boolean removeKubernetesClusterNode(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, UserVm userVm, int retries, int 
waitDuration) {
+        File pkFile = getManagementServerSshPublicKeyFile();
+        int retryCounter = 0;
+        while (retryCounter < retries) {
+            retryCounter++;
+            try {
+                Pair<Boolean, String> result = SshHelper.sshExecute(ipAddress, 
port, CLUSTER_NODE_VM_USER,
+                        pkFile, null, String.format("sudo kubectl drain %s 
--ignore-daemonsets --delete-local-data", userVm.getHostName()),
+                        10000, 10000, 60000);
+                if (!result.first()) {
+                    LOGGER.warn(String.format("Draining node: %s on VM ID: %s 
in Kubernetes cluster ID: %s unsuccessful", userVm.getHostName(), 
userVm.getUuid(), kubernetesCluster.getUuid()));
+                } else {
+                    result = SshHelper.sshExecute(ipAddress, port, 
CLUSTER_NODE_VM_USER,
+                            pkFile, null, String.format("sudo kubectl delete 
node %s", userVm.getHostName()),
+                            10000, 10000, 30000);
+                    if (result.first()) {
+                        return true;
+                    } else {
+                        LOGGER.warn(String.format("Deleting node: %s on VM ID: 
%s in Kubernetes cluster ID: %s unsuccessful", userVm.getHostName(), 
userVm.getUuid(), kubernetesCluster.getUuid()));
+                    }
+                }
+                break;
+            } catch (Exception e) {
+                String msg = String.format("Failed to remove Kubernetes 
cluster ID: %s node: %s on VM ID: %s", kubernetesCluster.getUuid(), 
userVm.getHostName(), userVm.getUuid());
+                LOGGER.warn(msg, e);
+            }
+            try {
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException ie) {
+                LOGGER.error(String.format("Error while waiting for Kubernetes 
cluster ID: %s node: %s on VM ID: %s removal", kubernetesCluster.getUuid(), 
userVm.getHostName(), userVm.getUuid()), ie);
+            }
+            retryCounter++;
+        }
+        return false;
+    }
+
+    private boolean uncordonKubernetesClusterNode(KubernetesCluster 
kubernetesCluster, String ipAddress, int port, UserVm userVm, int retries, int 
waitDuration) {
+        int retryCounter = 0;
+        while (retryCounter < retries) {
+            Pair<Boolean, String> result = null;
+            try {
+                result = SshHelper.sshExecute(ipAddress, port, 
CLUSTER_NODE_VM_USER, getManagementServerSshPublicKeyFile(), null,
+                        String.format("sudo kubectl uncordon %s", 
userVm.getHostName()),
+                        10000, 10000, 30000);
+                if (result.first()) {
+                    return true;
+                }
+            } catch (Exception e) {
 
 Review comment:
   I afraid refactoring SshHelper class can't be done in this PR :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to