This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new f217049  node agent async start and stop (#37)
f217049 is described below

commit f2170493a21a706e602495d2ff8c04a5e2588cde
Author: LiRui <[email protected]>
AuthorDate: Fri Apr 8 15:08:37 2022 +0800

    node agent async start and stop (#37)
    
    node agent async start and stop
---
 .../config/AgentUnInstallEventConfigInfo.java      |  41 ++++
 .../heartbeat/stage/AgentUnInstallEventStage.java  |  38 ++++
 .../component/DorisManagerUserSpaceComponent.java  |  38 ++--
 .../stack/control/manager/DorisClusterManager.java |   7 +-
 .../control/manager/ResourceClusterManager.java    | 138 +++++++++++-
 .../manager/ResourceNodeAndAgentManager.java       | 245 ++++++++++++++++-----
 .../org/apache/doris/stack/shell/BaseCommand.java  |  20 +-
 .../java/org/apache/doris/stack/shell/SSH.java     |   4 +-
 .../org/apache/doris/stack/util/Constants.java     |   2 +
 .../doris/stack/service/PaloUserSpaceService.java  |   6 +-
 .../doris/stack/dao/ClusterInstanceRepository.java |   4 +
 .../stack/dao/ClusterModuleServiceRepository.java  |   2 +
 12 files changed, 463 insertions(+), 82 deletions(-)

diff --git 
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
new file mode 100644
index 0000000..963c729
--- /dev/null
+++ 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class AgentUnInstallEventConfigInfo {
+    private String sshUser;
+
+    private int sshPort;
+
+    private String sshKey;
+
+    private String host;
+
+    private String installDir;
+
+    private long agentNodeId;
+
+    private int agentPort;
+}
diff --git 
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
new file mode 100644
index 0000000..7c0a721
--- /dev/null
+++ 
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat.stage;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+public enum AgentUnInstallEventStage {
+    AGENT_STOP("agent stop failed", "agent stop succeeded", 1, true);
+
+    private String error;
+
+    private String message;
+
+    private int stage;
+
+    private boolean isLast;
+
+}
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
index 2ecc6b4..8d75065 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
@@ -26,6 +26,7 @@ import org.apache.doris.stack.constant.ConstantDef;
 import org.apache.doris.stack.control.ModelControlLevel;
 import org.apache.doris.stack.control.ModelControlRequestType;
 import org.apache.doris.stack.control.manager.DorisClusterManager;
+import org.apache.doris.stack.control.manager.ResourceClusterManager;
 import org.apache.doris.stack.dao.ClusterInfoRepository;
 import org.apache.doris.stack.dao.ClusterUserMembershipRepository;
 import org.apache.doris.stack.dao.CoreUserRepository;
@@ -95,6 +96,9 @@ public class DorisManagerUserSpaceComponent extends 
BaseService {
     @Autowired
     private ClusterInfoRepository clusterInfoRepository;
 
+    @Autowired
+    private ResourceClusterManager resourceClusterManager;
+
     @Autowired
     private PaloLoginClient paloLoginClient;
 
@@ -447,24 +451,27 @@ public class DorisManagerUserSpaceComponent extends 
BaseService {
         // delete cluster information
         clusterInfoRepository.deleteById(spaceId);
 
-        try {
-            // delete cluster configuration
-            log.debug("delete cluster {} config infos.", spaceId);
-            settingComponent.deleteAdminSetting(spaceId);
+        // delete cluster configuration
+        log.debug("delete cluster {} config infos.", spaceId);
+        settingComponent.deleteAdminSetting(spaceId);
 
-            deleteClusterPermissionInfo(clusterInfo);
+        deleteClusterPermissionInfo(clusterInfo);
 
-            // delete user information
-            log.debug("delete cluster {} all user membership.", spaceId);
-            clusterUserMembershipRepository.deleteByClusterId(spaceId);
+        // delete user information
+        log.debug("delete cluster {} all user membership.", spaceId);
+        clusterUserMembershipRepository.deleteByClusterId(spaceId);
 
-            // TODO: In order to be compatible with the deleted content of 
spatial information before, it is put here.
-            //  If the interface that releases both cluster and physical 
resources is implemented later,
-            //  it will be unified in the current doriscluster processing 
operation
-            clusterManager.deleteClusterOperation(clusterInfo);
-        } catch (Exception e) {
-            log.warn("delete space {} related information failed", spaceId, e);
+        // TODO: In order to be compatible with the deleted content of spatial 
information before, it is put here.
+        //  If the interface that releases both cluster and physical resources 
is implemented later,
+        //  it will be unified in the current doriscluster processing operation
+        clusterManager.deleteClusterOperation(clusterInfo);
+
+        if (clusterInfo.getResourceClusterId() < 1L) {
+            log.info("resource cluster has not been created");
+            return;
         }
+        
resourceClusterManager.deleteAgentsOperation(clusterInfo.getResourceClusterId());
+        
resourceClusterManager.deleteOperation(clusterInfo.getResourceClusterId());
     }
 
     private void deleteClusterPermissionInfo(ClusterInfoEntity clusterInfo) 
throws Exception {
@@ -489,7 +496,8 @@ public class DorisManagerUserSpaceComponent extends 
BaseService {
             managerMetaSyncComponent.deleteClusterMetadata(clusterInfo);
 
             log.debug("Delete cluster {} analyzer user {}.", spaceId, 
allUserGroup.getPaloUserName());
-            queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS, 
ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo, allUserGroup.getPaloUserName());
+            queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS, 
ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo,
+                    allUserGroup.getPaloUserName());
         }
 
         // After deleting the user's space, set clusterid to 0
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
index e655206..da078ef 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
@@ -80,13 +80,13 @@ public class DorisClusterManager {
     private JdbcSampleClient jdbcClient;
 
     // Ensure the data atomicity of creating user space, so add transactions
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     public long initOperation(NewUserSpaceCreateReq spaceInfo, String creator) 
throws Exception {
         return userSpaceComponent.create(spaceInfo, creator);
     }
 
     // Ensure the atomicity of data in user space, so add transactions
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     public void updateClusterOperation(CoreUserEntity user, long clusterId,
                                        NewUserSpaceCreateReq spaceInfo) throws 
Exception {
         userSpaceComponent.update(user, clusterId, spaceInfo);
@@ -319,7 +319,7 @@ public class DorisClusterManager {
 
     public void deleteClusterOperation(ClusterInfoEntity clusterInfo)throws 
Exception {
         long clusterId = clusterInfo.getId();
-        log.info("Delete cluster {} instances operation.", clusterId);
+        log.info("Delete {} cluster {} instances operation.", clusterId, 
clusterInfo.getName());
         deleteClusterOperation(clusterId);
     }
 
@@ -330,5 +330,4 @@ public class DorisClusterManager {
             clusterModuleManager.deleteOperation(moduleEntity);
         }
     }
-
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index 2451aef..75fd80b 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -19,7 +19,9 @@ package org.apache.doris.stack.control.manager;
 
 import com.alibaba.fastjson.JSON;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import 
org.apache.doris.manager.common.heartbeat.config.AgentUnInstallEventConfigInfo;
 import org.apache.doris.stack.dao.ResourceClusterRepository;
 import org.apache.doris.stack.dao.ResourceNodeRepository;
 import org.apache.doris.stack.entity.ResourceClusterEntity;
@@ -29,7 +31,11 @@ import org.apache.doris.stack.util.ListUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 @Slf4j
 @Component
@@ -74,6 +80,8 @@ public class ResourceClusterManager {
         List<String> reduceList = ListUtil.getReduceList(hosts, existHosts);
         log.debug("resource cluster {} reduce nodes", reduceList);
         for (String host : reduceList) {
+            // node agent maybe not installed yet
+            // only delete cluster node db info
             nodeAndAgentManager.deleteOperation(resourceClusterId, host);
         }
 
@@ -116,15 +124,51 @@ public class ResourceClusterManager {
 
         // before install and start agent, to check whether port is available 
or not,
         // it can not guarantee the port must not be used when starting the 
agent,
-        // but it may expose this problem early if the port has been uses.
+        // but it may expose this problem early if the port has been used.
+        List<Pair<ResourceNodeEntity, CompletableFuture<Boolean>>> nodeFutures 
= new ArrayList<>();
         for (ResourceNodeEntity nodeEntity : nodeEntities) {
-            if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity, 
configInfo)) {
-                throw new Exception(nodeEntity.getHost() + ":" + 
nodeEntity.getAgentPort() + " is already in use");
+            CompletableFuture<Boolean> portCheckFuture = 
CompletableFuture.supplyAsync(() -> {
+                try {
+                    nodeAndAgentManager.checkSshConnect(nodeEntity, 
configInfo);
+                    return 
nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo);
+                } catch (Exception e) {
+                    log.error("check node {} exception: {}", 
nodeEntity.getHost(), e.getMessage());
+                    throw new CompletionException(e);
+                }
+            });
+            nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
+        }
+
+        boolean checkFailed = false;
+        StringBuilder exStrBuilder = new StringBuilder();
+        for (Pair<ResourceNodeEntity, CompletableFuture<Boolean>> nodeFuture: 
nodeFutures) {
+            ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
+            CompletableFuture<Boolean> future = nodeFuture.getRight();
+            try {
+                boolean isAvailablePort = future.get();
+                if (!isAvailablePort) {
+                    checkFailed = true;
+                    log.error("node {}:{} port already in use", 
nodeEntity.getHost(), nodeEntity.getAgentPort());
+                    throw new Exception(String.format("node %s:%d port already 
in use",
+                            nodeEntity.getHost(), nodeEntity.getAgentPort()));
+                }
+            } catch (Exception e) {
+                checkFailed = true;
+                log.error("node {}:{} check exception {}", 
nodeEntity.getHost(), nodeEntity.getAgentPort(), e);
+                exStrBuilder.append(String.format("%s:%d, %s",
+                        nodeEntity.getHost(), nodeEntity.getAgentPort(), e));
+                exStrBuilder.append("\n");
             }
         }
 
+        if (checkFailed) {
+            log.error("check node exception list: {}\n", exStrBuilder);
+            throw new Exception(exStrBuilder.toString());
+        }
+
         log.debug("install agent for resource cluster {} all nodes", 
resourceClusterId);
         for (ResourceNodeEntity nodeEntity : nodeEntities) {
+            log.info("start to install agent to {} node {}", 
nodeEntity.getId(), nodeEntity.getHost());
             nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, 
requestId);
         }
     }
@@ -139,6 +183,94 @@ public class ResourceClusterManager {
                         + "The next step cannot be carried out temporarily");
             }
         }
+    }
+
+    public void deleteOperation(long resourceClusterId) throws Exception {
+        log.info("to delete resource cluster {} info", resourceClusterId);
+
+        Optional<ResourceClusterEntity> resourceClusterOpt = 
resourceClusterRepository.findById(resourceClusterId);
+        if (!resourceClusterOpt.isPresent()) {
+            log.error("resource cluster {} does not exist", resourceClusterId);
+            throw new Exception("resource cluster" + resourceClusterId + "does 
not exist");
+        }
+
+        ResourceClusterEntity clusterEntity =  resourceClusterOpt.get();
+
+        List<ResourceNodeEntity> nodeList = 
nodeRepository.getByResourceClusterId(resourceClusterId);
+        for (ResourceNodeEntity node : nodeList) {
+            log.info("delete node {}, host: {}", node.getId(), node.getHost());
+            nodeAndAgentManager.deleteOperation(node.getId());
+        }
+
+        log.info("delete resource cluster {}", resourceClusterId);
+        resourceClusterRepository.delete(clusterEntity);
+    }
+
+    public void deleteAgentsOperation(long resourceClusterId) throws  
Exception {
+        log.info("delete resource cluster {} all nodes agent", 
resourceClusterId);
 
+        Optional<ResourceClusterEntity> resourceClusterOpt = 
resourceClusterRepository.findById(resourceClusterId);
+        if (!resourceClusterOpt.isPresent()) {
+            throw new Exception("resource cluster " + resourceClusterId + " 
does not exist");
+        }
+
+        ResourceClusterEntity clusterEntity =  resourceClusterOpt.get();
+        PMResourceClusterAccessInfo accessInfo = 
JSON.parseObject(clusterEntity.getAccessInfo(),
+                PMResourceClusterAccessInfo.class);
+
+        List<ResourceNodeEntity> nodeEntities = 
nodeRepository.getByResourceClusterId(resourceClusterId);
+
+        List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>();
+        for (ResourceNodeEntity nodeEntity : nodeEntities) {
+            if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) {
+                log.warn("the agent has not been installed on {} node {}", 
nodeEntity.getId(), nodeEntity.getHost());
+            } else {
+                agentInstalledNodes.add(nodeEntity);
+            }
+        }
+
+        // we check something before uninstall agent
+        // to guarantee uninstall operation must be executed
+        List<Pair<ResourceNodeEntity, CompletableFuture<Void>>> nodeFutures = 
new ArrayList<>();
+        for (ResourceNodeEntity nodeEntity : agentInstalledNodes) {
+            CompletableFuture<Void> portCheckFuture = 
CompletableFuture.runAsync(() -> {
+                AgentInstallEventConfigInfo installConfig = new 
AgentInstallEventConfigInfo();
+                installConfig.setSshUser(accessInfo.getSshUser());
+                installConfig.setSshPort(accessInfo.getSshPort());
+                installConfig.setSshKey(accessInfo.getSshKey());
+
+                try {
+                    log.info("check ssh connect and stop script before 
uninstall agent on node {}", nodeEntity.getId());
+                    nodeAndAgentManager.checkSshConnect(nodeEntity, 
installConfig);
+                    nodeAndAgentManager.checkStopScriptExist(nodeEntity, 
installConfig);
+                } catch (Exception e) {
+                    log.error("check node {} exception: {}", 
nodeEntity.getHost(), e.getMessage());
+                    throw new CompletionException(e);
+                }
+            });
+            nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
+        }
+
+        for (Pair<ResourceNodeEntity, CompletableFuture<Void>> nodeFuture: 
nodeFutures) {
+            ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
+            CompletableFuture<Void> future = nodeFuture.getRight();
+            try {
+                future.get();
+            } catch (Exception e) {
+                log.error("check {} node {} stop script exception {}", 
nodeEntity.getId(), nodeEntity.getHost(), e);
+                throw new Exception("check node stop script failed" + e);
+            }
+        }
+
+        // async delete agent
+        for (ResourceNodeEntity nodeEntity : nodeEntities) {
+            AgentUnInstallEventConfigInfo uninstallConfig = new 
AgentUnInstallEventConfigInfo(
+                    accessInfo.getSshUser(), accessInfo.getSshPort(), 
accessInfo.getSshKey(),
+                    nodeEntity.getHost(), nodeEntity.getAgentInstallDir(),
+                    nodeEntity.getId(), nodeEntity.getAgentPort());
+
+            log.info("to stop agent of {} node {}", nodeEntity.getId(), 
nodeEntity.getHost());
+            nodeAndAgentManager.deleteAgentOperation(nodeEntity, 
uninstallConfig);
+        }
     }
 }
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index e84a5af..45a5253 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventResultType;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventType;
 import 
org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import 
org.apache.doris.manager.common.heartbeat.config.AgentUnInstallEventConfigInfo;
 import org.apache.doris.manager.common.heartbeat.stage.AgentInstallEventStage;
+import 
org.apache.doris.manager.common.heartbeat.stage.AgentUnInstallEventStage;
 import org.apache.doris.stack.constant.EnvironmentDefine;
 import org.apache.doris.stack.dao.HeartBeatEventRepository;
 import org.apache.doris.stack.dao.ResourceNodeRepository;
@@ -41,11 +43,14 @@ import org.springframework.stereotype.Component;
 import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.concurrent.CompletableFuture;
 
 @Slf4j
 @Component
 public class ResourceNodeAndAgentManager {
     private static final String AGENT_START_SCRIPT = 
Constants.KEY_DORIS_AGENT_START_SCRIPT;
+    private static final String AGENT_STOP_SCRIPT = 
Constants.KEY_DORIS_AGENT_STOP_SCRIPT;
     private static final String AGENT_CONFIG_PATH = 
Constants.KEY_DORIS_AGENT_CONFIG_PATH;
 
     @Autowired
@@ -61,37 +66,44 @@ public class ResourceNodeAndAgentManager {
         return newNodeEntity.getId();
     }
 
-    // TODO:Uninstall agent
     public void deleteOperation(long resourceClusterId, String host) {
         log.info("delete node {} for resource cluster {}", host, 
resourceClusterId);
         nodeRepository.deleteByResourceClusterIdAndHost(resourceClusterId, 
host);
     }
 
-    public boolean isAvailableAgentPort(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo)
-            throws Exception {
-        // agent port check, eg: Spring Boot Param server.port=8008
-        log.info("check {} node port {}:{}", node.getId(), node.getHost(), 
node.getAgentPort());
-        String sshkey = String.format("sshkey-%d-%d", node.getId(), 
node.getAgentPort());
-        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
-        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+    public void deleteOperation(long nodeId) {
+        log.info("delete node {}", nodeId);
+        nodeRepository.deleteById(nodeId);
+    }
 
-        // only check listen port
-        String checkPortCmd = String.format("netstat -tunlp | grep  -w %d", 
node.getAgentPort());
-        SSH checkPortSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
-                sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
-        if (checkPortSSH.run()) {
-            String netInfo = checkPortSSH.getStdoutResponse();
-            log.info("agent node {} port check return output\n: {}", 
node.getId(), netInfo);
+    // only responsible to stop agent
+    public void deleteAgentOperation(ResourceNodeEntity node, 
AgentUnInstallEventConfigInfo configInfo)
+            throws Exception {
 
-            if (netInfo != null && !netInfo.trim().isEmpty()) {
-                log.error("agent node {} port {} already in use:\n {}", 
node.getId(), node.getAgentPort(), netInfo);
-                return false;
-            }
-        } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep 
failed, other exit code is exception
-            log.warn("run check port cmd failed");
-            throw new Exception("check agent port scrpit execution exception");
+        // check agent has been installed or not
+        if (!checkAgentOperation(node)) {
+            log.warn("node[{}]:{} does not install agent, no need to 
uninstall", node.getId(), node.getHost());
+            return;
         }
-        return true;
+
+        HeartBeatEventEntity uninstallEvent = new 
HeartBeatEventEntity(HeartBeatEventType.AGENT_STOP.name(),
+                HeartBeatEventResultType.INIT.name(), 
JSON.toJSONString(configInfo), 0);
+
+        uninstallEvent = heartBeatEventRepository.save(uninstallEvent);
+
+        log.info("create event of deleting agent , event id {}", 
uninstallEvent.getId());
+        node.setCurrentEventId(uninstallEvent.getId());
+        nodeRepository.save(node);
+
+        log.info("to stop node[{}] agent for resource cluster {}", 
node.getHost(),
+                node.getResourceClusterId());
+
+        HeartBeatEventEntity finalUninstallEvent = uninstallEvent;
+        CompletableFuture<Void> uninstallFuture = 
CompletableFuture.runAsync(() -> {
+            log.info("start to handle uninstall agent event on {} node {}", 
node.getId(), node.getHost());
+            uninstallEventProcess(node, configInfo, finalUninstallEvent);
+            log.info("async uninstall agent on {} node {} success", 
node.getId(), node.getHost());
+        });
     }
 
     public void installAgentOperation(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo, long requestId) {
@@ -139,6 +151,125 @@ public class ResourceNodeAndAgentManager {
             agentInstallAgentEntity = 
heartBeatEventRepository.save(eventEntity);
         }
 
+        // handle install agent event async
+        CompletableFuture<Void> installFuture = CompletableFuture.runAsync(() 
-> {
+            log.info("start to handle install agent event to {} node {}", 
node.getId(), node.getHost());
+            installEventProcess(node, configInfo, agentInstallAgentEntity);
+            log.info("async install agent to {} node {} success", 
node.getId(), node.getHost());
+        });
+    }
+
+    public boolean checkAgentOperation(ResourceNodeEntity node) {
+        long eventId = node.getCurrentEventId();
+
+        if (eventId < 1L) {
+            log.warn("The node no have agent");
+            return false;
+        } else {
+            HeartBeatEventEntity eventEntity = 
heartBeatEventRepository.findById(eventId).get();
+            // If the agent has been successfully installed and a new agent 
request operation has been performed,
+            // the installation cannot be performed again
+            if 
(!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
+                return true;
+            }
+
+            if (eventEntity.isCompleted() && 
eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
+                return true;
+            }
+
+            log.warn("Agent has not been installed successfully");
+            return false;
+        }
+    }
+
+    public boolean isAvailableAgentPort(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo)
+            throws Exception {
+        // agent port check, eg: Spring Boot Param server.port=8008
+        log.info("check {} node port {}:{}", node.getId(), node.getHost(), 
node.getAgentPort());
+        String sshkey = String.format("sshkey-%d-%d", node.getId(), 
node.getAgentPort());
+        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+        // only check listen port
+        String checkPortCmd = String.format("netstat -tunlp | grep  -w %d", 
node.getAgentPort());
+        SSH checkPortSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
+        if (checkPortSSH.run(3000)) {
+            String netInfo = checkPortSSH.getStdoutResponse();
+            log.info("agent node {} port check return output\n: {}", 
node.getId(), netInfo);
+
+            if (netInfo != null && !netInfo.trim().isEmpty()) {
+                log.error("agent node {} port {} already in use:\n {}", 
node.getId(), node.getAgentPort(), netInfo);
+                return false;
+            }
+        } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep 
failed, other exit code is exception
+            log.warn("run check port cmd failed");
+            throw new Exception("check agent port scrpit execution exception:" 
+  checkPortSSH.getErrorResponse());
+        }
+        log.info("{} node {}:{} is available", node.getId(), node.getHost(), 
node.getAgentPort());
+        return true;
+    }
+
+    public void checkSshConnect(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo)
+            throws Exception {
+
+        log.info("check {} node {} ssh connect", node.getId(), node.getHost());
+        String sshkey = String.format("sshkey-%d", node.getId());
+        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+        SSH checkSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), node.getHost(), "echo ok");
+
+        // if ssh-key is invalid, ssh commond will enter interactive mode that 
will block the process
+        // user not exists(255), user error(need input password), ssh port 
error(1), host error(1)
+        if (checkSSH.run(3000)) {
+            log.info("ssh connect {} node {} success", node.getId(), 
node.getHost());
+        } else {
+            throw new Exception(String.format("ssh connect node %s failed: 
error code %d, error msg: %s",
+                    node.getHost(), checkSSH.getExitCode(),
+                    checkSSH.getErrorResponse()));
+        }
+    }
+
+    public void checkStopScriptExist(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo)
+            throws Exception {
+        log.info("check node {} agent stop script", node.getId());
+        checkPathExist(node, configInfo, Paths.get(node.getAgentInstallDir(), 
"agent", AGENT_STOP_SCRIPT).toString());
+    }
+
+    public void checkPathExist(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo,
+                               String path) throws Exception {
+
+        String sshkey = String.format("sshkey-%d", node.getId());
+        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+        String checkPathExistCmd = "if test -e " + path + "; then echo ok; 
else exit 1; fi";
+        SSH checkPathSSH = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), node.getHost(), 
checkPathExistCmd);
+        if (!checkPathSSH.run()) {
+            if (checkPathSSH.getExitCode() == 1) {
+                log.error("path {} is not exist on node {}", path, 
node.getId());
+                throw new Exception(String.format("node %s does not have path: 
%s", node.getHost(), path));
+            } else {
+                throw new Exception("check path exception:" +  
checkPathSSH.getErrorResponse());
+            }
+        }
+    }
+
+    private void installEventProcess(ResourceNodeEntity node, 
AgentInstallEventConfigInfo configInfo,
+                                     HeartBeatEventEntity 
agentInstallAgentEntity) {
+        if 
(!agentInstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name()))
 {
+            log.warn("agent has been installed on {} node {}", node.getId(), 
node.getHost());
+            return;
+        }
+
+        if 
(!agentInstallAgentEntity.getStatus().equals(HeartBeatEventResultType.INIT.name()))
 {
+            log.warn("agent is being installed on {} node {}", node.getId(), 
node.getHost());
+            return;
+        }
+
         // AGENT_INSTALL heartbeat handle
         // TODO: Before each Stage operation, it is necessary to judge whether 
the event has been cancelled
         // ACCESS_AUTH stage
@@ -218,19 +349,6 @@ public class ResourceNodeAndAgentManager {
 
         // agent start
         // AGENT_START stage
-        try {
-            if (!isAvailableAgentPort(node, configInfo)) {
-                log.error("port {}:{} already in use", configInfo.getHost(), 
configInfo.getAgentPort());
-                updateFailResult(String.format("agent port %s:%d already in 
use",
-                                configInfo.getHost(), 
configInfo.getAgentPort()),
-                        AgentInstallEventStage.AGENT_DEPLOY.getStage(), 
agentInstallAgentEntity);
-                return;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            log.error("check agent port exception, skip port check");
-        }
-
         String agentInstallHome = configInfo.getInstallDir() + File.separator 
+ "agent";
 
         log.info("to start agent with port {}", configInfo.getAgentPort());
@@ -240,7 +358,7 @@ public class ResourceNodeAndAgentManager {
         SSH startSsh = new SSH(configInfo.getSshUser(), 
configInfo.getSshPort(),
                 sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
         if (!startSsh.run()) {
-            log.error("agent start failed:{}", ssh.getErrorResponse());
+            log.error("agent start failed:{}", startSsh.getErrorResponse());
             updateFailResult(AgentInstallEventStage.AGENT_START.getError(),
                     AgentInstallEventStage.AGENT_START.getStage(), 
agentInstallAgentEntity);
             return;
@@ -250,27 +368,46 @@ public class ResourceNodeAndAgentManager {
                 AgentInstallEventStage.AGENT_START.getStage(), 
agentInstallAgentEntity);
     }
 
-    public boolean checkAgentOperation(ResourceNodeEntity node) {
-        long eventId = node.getCurrentEventId();
+    private void uninstallEventProcess(ResourceNodeEntity node, 
AgentUnInstallEventConfigInfo configInfo,
+                                       HeartBeatEventEntity 
agentUninstallAgentEntity) {
+        if 
(!agentUninstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_STOP.name()))
 {
+            log.warn("agent no need to stop on {} node {}", node.getId(), 
node.getHost());
+            return;
+        }
 
-        if (eventId < 1L) {
-            log.warn("The node no have agent");
-            return false;
-        } else {
-            HeartBeatEventEntity eventEntity = 
heartBeatEventRepository.findById(eventId).get();
-            // If the agent has been successfully installed and a new agent 
request operation has been performed,
-            // the installation cannot be performed again
-            if 
(!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
-                return true;
-            }
+        if 
(!agentUninstallAgentEntity.getStatus().equals(HeartBeatEventResultType.INIT.name()))
 {
+            log.warn("agent is being stopped on {} node {}", node.getId(), 
node.getHost());
+            return;
+        }
+        // warning: multithreaded write operation
+        String sshkey = String.format("sshkey-%d", node.getId());
+        File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+        SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
 
-            if (eventEntity.isCompleted() && 
eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
-                return true;
-            }
+        String agentInstallHome = configInfo.getInstallDir() + File.separator 
+ "agent";
 
-            log.warn("Agent has not been installed successfully");
-            return false;
+        log.info("to uninstall {} agent", configInfo.getHost());
+//        String command = "cd %s && sh %s && rm -rf %s";
+//        String cmd = String.format(command, agentInstallHome, 
AGENT_STOP_SCRIPT, agentInstallHome);
+        String command = "cd %s && sh %s";
+        String cmd = String.format(command, agentInstallHome, 
AGENT_STOP_SCRIPT);
+        SSH stopSsh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
+                sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
+        if (!stopSsh.run()) {
+            log.error("agent stop failed:{}", stopSsh.getErrorResponse());
+            agentUninstallAgentEntity.setCompleted(true);
+            
agentUninstallAgentEntity.setStatus(HeartBeatEventResultType.FAIL.name());
+            heartBeatEventRepository.save(agentUninstallAgentEntity);
+
+            updateFailResult(AgentUnInstallEventStage.AGENT_STOP.getError() + 
stopSsh.getErrorResponse(),
+                    AgentUnInstallEventStage.AGENT_STOP.getStage(), 
agentUninstallAgentEntity);
         }
+
+        log.info("node {} success to uninstall agent", node.getHost());
+
+        agentUninstallAgentEntity.setCompleted(true);
+        
agentUninstallAgentEntity.setStatus(HeartBeatEventResultType.SUCCESS.name());
+        heartBeatEventRepository.save(agentUninstallAgentEntity);
     }
 
     private void updateProcessingResult(String result, int stage, 
HeartBeatEventEntity eventEntity) {
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index 2164bed..dc941ab 100644
--- 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++ 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -51,8 +52,12 @@ public abstract class BaseCommand {
     }
 
     public boolean run() {
+        return run(0);
+    }
+
+    public boolean run(long timeoutMs) {
         buildCommand();
-        log.info("run command: {}", StringUtils.join(resultCommand, " "));
+        log.info("run command: {} ,timeout time: {}ms", 
StringUtils.join(resultCommand, " "), timeoutMs);
         ProcessBuilder pb = new ProcessBuilder(resultCommand);
         Process process = null;
         BufferedReader stdoutBufferedReader = null;
@@ -65,7 +70,18 @@ public abstract class BaseCommand {
             stdoutResponse = 
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
             errorResponse = 
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
 
-            exitCode = process.waitFor();
+            if (timeoutMs <= 0) {
+                exitCode = process.waitFor();
+            } else {
+                boolean isExit = process.waitFor(timeoutMs, 
TimeUnit.MICROSECONDS);
+                if (!isExit) {
+                    exitCode = 124; // the same as timeout command
+                    log.error("command run timeout in {}ms", timeoutMs);
+                    return false;
+                }
+                exitCode = process.exitValue();
+            }
+
             if (exitCode == 0) {
                 return true;
             } else {
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
index 456eec0..48a001e 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
@@ -95,9 +95,11 @@ public class SSH extends BaseCommand {
 
     protected void buildCommand() {
         String[] command = new String[]{"ssh",
-                "-o", "ConnectTimeOut=60",
+                "-o", "ConnectTimeOut=20",
                 "-o", "StrictHostKeyChecking=no",
                 "-o", "BatchMode=yes",
+                // close password auth in case of blocking program when ssh 
auth failed
+                "-o", "PasswordAuthentication=no",
                 "-i", this.sshKeyFile,
                 "-p", String.valueOf(this.sshPort),
                 this.user + "@" + this.host, this.command
diff --git 
a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java 
b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
index 896e1d0..0869a27 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
@@ -19,6 +19,8 @@ package org.apache.doris.stack.util;
 
 public class Constants {
     public static final String KEY_DORIS_AGENT_START_SCRIPT = 
"bin/agent_start.sh";
+    public static final String KEY_DORIS_AGENT_STOP_SCRIPT = 
"bin/agent_stop.sh";
+
     public static final String KEY_DORIS_AGENT_CONFIG_PATH = 
"config/application.properties";
     public static final String KEY_FE_QUERY_PORT = "query_port";
     public static final String KEY_FE_EDIT_LOG_PORT = "edit_log_port";
diff --git 
a/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
 
b/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
index 4f8f3eb..85dcff7 100644
--- 
a/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
+++ 
b/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
@@ -42,7 +42,7 @@ public class PaloUserSpaceService extends BaseService {
     @Autowired
     private DorisManagerUserSpaceComponent userSpaceComponent;
 
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     public long create(NewUserSpaceCreateReq createReq, CoreUserEntity user) 
throws Exception {
         return userSpaceComponent.create(createReq, user.getFirstName());
     }
@@ -62,7 +62,7 @@ public class PaloUserSpaceService extends BaseService {
      * @return
      * @throws Exception
      */
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     public NewUserSpaceInfo update(CoreUserEntity user, int spaceId, 
NewUserSpaceCreateReq updateReq) throws Exception {
         return userSpaceComponent.update(user, spaceId, updateReq);
     }
@@ -93,7 +93,7 @@ public class PaloUserSpaceService extends BaseService {
      * @param spaceId
      * @throws Exception
      */
-    @Transactional
+    @Transactional(rollbackFor = Exception.class)
     public void deleteSpace(int spaceId) throws Exception {
         userSpaceComponent.deleteSpace(spaceId);
     }
diff --git 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
index b77486e..53c5ce7 100644
--- 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
+++ 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
@@ -21,8 +21,10 @@ import org.apache.doris.stack.entity.ClusterInstanceEntity;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
 import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 
@@ -46,6 +48,8 @@ public interface ClusterInstanceRepository extends 
JpaRepository<ClusterInstance
     void deleteById(Long id);
 
     @Override
+    @Transactional
+    @Modifying
     @CacheEvict(value = "cluster_instance", key = "#entity.nodeId")
     void delete(ClusterInstanceEntity entity);
 }
diff --git 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
index 81d8144..81fc551 100644
--- 
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
+++ 
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
@@ -22,6 +22,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 
@@ -34,6 +35,7 @@ public interface ClusterModuleServiceRepository extends 
JpaRepository<ClusterMod
     @Query("select c from ClusterModuleServiceEntity c where c.clusterId = 
:clusterId")
     List<ClusterModuleServiceEntity> getByClusterId(@Param("clusterId") long 
clusterId);
 
+    @Transactional
     @Modifying
     @Query("delete from ClusterModuleServiceEntity c where c.moduleId = 
:moduleId")
     void deleteByModuleId(@Param("moduleId") long moduleId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to