This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push:
new f217049 node agent async start and stop (#37)
f217049 is described below
commit f2170493a21a706e602495d2ff8c04a5e2588cde
Author: LiRui <[email protected]>
AuthorDate: Fri Apr 8 15:08:37 2022 +0800
node agent async start and stop (#37)
node agent async start and stop
---
.../config/AgentUnInstallEventConfigInfo.java | 41 ++++
.../heartbeat/stage/AgentUnInstallEventStage.java | 38 ++++
.../component/DorisManagerUserSpaceComponent.java | 38 ++--
.../stack/control/manager/DorisClusterManager.java | 7 +-
.../control/manager/ResourceClusterManager.java | 138 +++++++++++-
.../manager/ResourceNodeAndAgentManager.java | 245 ++++++++++++++++-----
.../org/apache/doris/stack/shell/BaseCommand.java | 20 +-
.../java/org/apache/doris/stack/shell/SSH.java | 4 +-
.../org/apache/doris/stack/util/Constants.java | 2 +
.../doris/stack/service/PaloUserSpaceService.java | 6 +-
.../doris/stack/dao/ClusterInstanceRepository.java | 4 +
.../stack/dao/ClusterModuleServiceRepository.java | 2 +
12 files changed, 463 insertions(+), 82 deletions(-)
diff --git
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
new file mode 100644
index 0000000..963c729
--- /dev/null
+++
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/config/AgentUnInstallEventConfigInfo.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class AgentUnInstallEventConfigInfo {
+ private String sshUser;
+
+ private int sshPort;
+
+ private String sshKey;
+
+ private String host;
+
+ private String installDir;
+
+ private long agentNodeId;
+
+ private int agentPort;
+}
diff --git
a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
new file mode 100644
index 0000000..7c0a721
--- /dev/null
+++
b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/stage/AgentUnInstallEventStage.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat.stage;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+public enum AgentUnInstallEventStage {
+ AGENT_STOP("agent stop failed", "agent stop succeeded", 1, true);
+
+ private String error;
+
+ private String message;
+
+ private int stage;
+
+ private boolean isLast;
+
+}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
index 2ecc6b4..8d75065 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/component/DorisManagerUserSpaceComponent.java
@@ -26,6 +26,7 @@ import org.apache.doris.stack.constant.ConstantDef;
import org.apache.doris.stack.control.ModelControlLevel;
import org.apache.doris.stack.control.ModelControlRequestType;
import org.apache.doris.stack.control.manager.DorisClusterManager;
+import org.apache.doris.stack.control.manager.ResourceClusterManager;
import org.apache.doris.stack.dao.ClusterInfoRepository;
import org.apache.doris.stack.dao.ClusterUserMembershipRepository;
import org.apache.doris.stack.dao.CoreUserRepository;
@@ -95,6 +96,9 @@ public class DorisManagerUserSpaceComponent extends
BaseService {
@Autowired
private ClusterInfoRepository clusterInfoRepository;
+ @Autowired
+ private ResourceClusterManager resourceClusterManager;
+
@Autowired
private PaloLoginClient paloLoginClient;
@@ -447,24 +451,27 @@ public class DorisManagerUserSpaceComponent extends
BaseService {
// delete cluster information
clusterInfoRepository.deleteById(spaceId);
- try {
- // delete cluster configuration
- log.debug("delete cluster {} config infos.", spaceId);
- settingComponent.deleteAdminSetting(spaceId);
+ // delete cluster configuration
+ log.debug("delete cluster {} config infos.", spaceId);
+ settingComponent.deleteAdminSetting(spaceId);
- deleteClusterPermissionInfo(clusterInfo);
+ deleteClusterPermissionInfo(clusterInfo);
- // delete user information
- log.debug("delete cluster {} all user membership.", spaceId);
- clusterUserMembershipRepository.deleteByClusterId(spaceId);
+ // delete user information
+ log.debug("delete cluster {} all user membership.", spaceId);
+ clusterUserMembershipRepository.deleteByClusterId(spaceId);
- // TODO: In order to be compatible with the deleted content of
spatial information before, it is put here.
- // If the interface that releases both cluster and physical
resources is implemented later,
- // it will be unified in the current doriscluster processing
operation
- clusterManager.deleteClusterOperation(clusterInfo);
- } catch (Exception e) {
- log.warn("delete space {} related information failed", spaceId, e);
+ // TODO: In order to be compatible with the deleted content of spatial
information before, it is put here.
+ // If the interface that releases both cluster and physical resources
is implemented later,
+ // it will be unified in the current doriscluster processing operation
+ clusterManager.deleteClusterOperation(clusterInfo);
+
+ if (clusterInfo.getResourceClusterId() < 1L) {
+ log.info("resource cluster has not been created");
+ return;
}
+
resourceClusterManager.deleteAgentsOperation(clusterInfo.getResourceClusterId());
+
resourceClusterManager.deleteOperation(clusterInfo.getResourceClusterId());
}
private void deleteClusterPermissionInfo(ClusterInfoEntity clusterInfo)
throws Exception {
@@ -489,7 +496,8 @@ public class DorisManagerUserSpaceComponent extends
BaseService {
managerMetaSyncComponent.deleteClusterMetadata(clusterInfo);
log.debug("Delete cluster {} analyzer user {}.", spaceId,
allUserGroup.getPaloUserName());
- queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS,
ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo, allUserGroup.getPaloUserName());
+ queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS,
ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo,
+ allUserGroup.getPaloUserName());
}
// After deleting the user's space, set clusterid to 0
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
index e655206..da078ef 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterManager.java
@@ -80,13 +80,13 @@ public class DorisClusterManager {
private JdbcSampleClient jdbcClient;
// Ensure the data atomicity of creating user space, so add transactions
- @Transactional
+ @Transactional(rollbackFor = Exception.class)
public long initOperation(NewUserSpaceCreateReq spaceInfo, String creator)
throws Exception {
return userSpaceComponent.create(spaceInfo, creator);
}
// Ensure the atomicity of data in user space, so add transactions
- @Transactional
+ @Transactional(rollbackFor = Exception.class)
public void updateClusterOperation(CoreUserEntity user, long clusterId,
NewUserSpaceCreateReq spaceInfo) throws
Exception {
userSpaceComponent.update(user, clusterId, spaceInfo);
@@ -319,7 +319,7 @@ public class DorisClusterManager {
public void deleteClusterOperation(ClusterInfoEntity clusterInfo)throws
Exception {
long clusterId = clusterInfo.getId();
- log.info("Delete cluster {} instances operation.", clusterId);
+ log.info("Delete {} cluster {} instances operation.", clusterId,
clusterInfo.getName());
deleteClusterOperation(clusterId);
}
@@ -330,5 +330,4 @@ public class DorisClusterManager {
clusterModuleManager.deleteOperation(moduleEntity);
}
}
-
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index 2451aef..75fd80b 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -19,7 +19,9 @@ package org.apache.doris.stack.control.manager;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import
org.apache.doris.manager.common.heartbeat.config.AgentUnInstallEventConfigInfo;
import org.apache.doris.stack.dao.ResourceClusterRepository;
import org.apache.doris.stack.dao.ResourceNodeRepository;
import org.apache.doris.stack.entity.ResourceClusterEntity;
@@ -29,7 +31,11 @@ import org.apache.doris.stack.util.ListUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
@Slf4j
@Component
@@ -74,6 +80,8 @@ public class ResourceClusterManager {
List<String> reduceList = ListUtil.getReduceList(hosts, existHosts);
log.debug("resource cluster {} reduce nodes", reduceList);
for (String host : reduceList) {
+ // node agent maybe not installed yet
+ // only delete cluster node db info
nodeAndAgentManager.deleteOperation(resourceClusterId, host);
}
@@ -116,15 +124,51 @@ public class ResourceClusterManager {
// before install and start agent, to check whether port is available
or not,
// it can not guarantee the port must not be used when starting the
agent,
- // but it may expose this problem early if the port has been uses.
+ // but it may expose this problem early if the port has been used.
+ List<Pair<ResourceNodeEntity, CompletableFuture<Boolean>>> nodeFutures
= new ArrayList<>();
for (ResourceNodeEntity nodeEntity : nodeEntities) {
- if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity,
configInfo)) {
- throw new Exception(nodeEntity.getHost() + ":" +
nodeEntity.getAgentPort() + " is already in use");
+ CompletableFuture<Boolean> portCheckFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ nodeAndAgentManager.checkSshConnect(nodeEntity,
configInfo);
+ return
nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo);
+ } catch (Exception e) {
+ log.error("check node {} exception: {}",
nodeEntity.getHost(), e.getMessage());
+ throw new CompletionException(e);
+ }
+ });
+ nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
+ }
+
+ boolean checkFailed = false;
+ StringBuilder exStrBuilder = new StringBuilder();
+ for (Pair<ResourceNodeEntity, CompletableFuture<Boolean>> nodeFuture:
nodeFutures) {
+ ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
+ CompletableFuture<Boolean> future = nodeFuture.getRight();
+ try {
+ boolean isAvailablePort = future.get();
+ if (!isAvailablePort) {
+ checkFailed = true;
+ log.error("node {}:{} port already in use",
nodeEntity.getHost(), nodeEntity.getAgentPort());
+ throw new Exception(String.format("node %s:%d port already
in use",
+ nodeEntity.getHost(), nodeEntity.getAgentPort()));
+ }
+ } catch (Exception e) {
+ checkFailed = true;
+ log.error("node {}:{} check exception {}",
nodeEntity.getHost(), nodeEntity.getAgentPort(), e);
+ exStrBuilder.append(String.format("%s:%d, %s",
+ nodeEntity.getHost(), nodeEntity.getAgentPort(), e));
+ exStrBuilder.append("\n");
}
}
+ if (checkFailed) {
+ log.error("check node exception list: {}\n", exStrBuilder);
+ throw new Exception(exStrBuilder.toString());
+ }
+
log.debug("install agent for resource cluster {} all nodes",
resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
+ log.info("start to install agent to {} node {}",
nodeEntity.getId(), nodeEntity.getHost());
nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo,
requestId);
}
}
@@ -139,6 +183,94 @@ public class ResourceClusterManager {
+ "The next step cannot be carried out temporarily");
}
}
+ }
+
+ public void deleteOperation(long resourceClusterId) throws Exception {
+ log.info("to delete resource cluster {} info", resourceClusterId);
+
+ Optional<ResourceClusterEntity> resourceClusterOpt =
resourceClusterRepository.findById(resourceClusterId);
+ if (!resourceClusterOpt.isPresent()) {
+ log.error("resource cluster {} does not exist", resourceClusterId);
+ throw new Exception("resource cluster" + resourceClusterId + "does
not exist");
+ }
+
+ ResourceClusterEntity clusterEntity = resourceClusterOpt.get();
+
+ List<ResourceNodeEntity> nodeList =
nodeRepository.getByResourceClusterId(resourceClusterId);
+ for (ResourceNodeEntity node : nodeList) {
+ log.info("delete node {}, host: {}", node.getId(), node.getHost());
+ nodeAndAgentManager.deleteOperation(node.getId());
+ }
+
+ log.info("delete resource cluster {}", resourceClusterId);
+ resourceClusterRepository.delete(clusterEntity);
+ }
+
+ public void deleteAgentsOperation(long resourceClusterId) throws
Exception {
+ log.info("delete resource cluster {} all nodes agent",
resourceClusterId);
+ Optional<ResourceClusterEntity> resourceClusterOpt =
resourceClusterRepository.findById(resourceClusterId);
+ if (!resourceClusterOpt.isPresent()) {
+ throw new Exception("resource cluster " + resourceClusterId + "
does not exist");
+ }
+
+ ResourceClusterEntity clusterEntity = resourceClusterOpt.get();
+ PMResourceClusterAccessInfo accessInfo =
JSON.parseObject(clusterEntity.getAccessInfo(),
+ PMResourceClusterAccessInfo.class);
+
+ List<ResourceNodeEntity> nodeEntities =
nodeRepository.getByResourceClusterId(resourceClusterId);
+
+ List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>();
+ for (ResourceNodeEntity nodeEntity : nodeEntities) {
+ if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) {
+ log.warn("the agent has not been installed on {} node {}",
nodeEntity.getId(), nodeEntity.getHost());
+ } else {
+ agentInstalledNodes.add(nodeEntity);
+ }
+ }
+
+ // we check something before uninstall agent
+ // to guarantee uninstall operation must be executed
+ List<Pair<ResourceNodeEntity, CompletableFuture<Void>>> nodeFutures =
new ArrayList<>();
+ for (ResourceNodeEntity nodeEntity : agentInstalledNodes) {
+ CompletableFuture<Void> portCheckFuture =
CompletableFuture.runAsync(() -> {
+ AgentInstallEventConfigInfo installConfig = new
AgentInstallEventConfigInfo();
+ installConfig.setSshUser(accessInfo.getSshUser());
+ installConfig.setSshPort(accessInfo.getSshPort());
+ installConfig.setSshKey(accessInfo.getSshKey());
+
+ try {
+ log.info("check ssh connect and stop script before
uninstall agent on node {}", nodeEntity.getId());
+ nodeAndAgentManager.checkSshConnect(nodeEntity,
installConfig);
+ nodeAndAgentManager.checkStopScriptExist(nodeEntity,
installConfig);
+ } catch (Exception e) {
+ log.error("check node {} exception: {}",
nodeEntity.getHost(), e.getMessage());
+ throw new CompletionException(e);
+ }
+ });
+ nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
+ }
+
+ for (Pair<ResourceNodeEntity, CompletableFuture<Void>> nodeFuture:
nodeFutures) {
+ ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
+ CompletableFuture<Void> future = nodeFuture.getRight();
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("check {} node {} stop script exception {}",
nodeEntity.getId(), nodeEntity.getHost(), e);
+ throw new Exception("check node stop script failed" + e);
+ }
+ }
+
+ // async delete agent
+ for (ResourceNodeEntity nodeEntity : nodeEntities) {
+ AgentUnInstallEventConfigInfo uninstallConfig = new
AgentUnInstallEventConfigInfo(
+ accessInfo.getSshUser(), accessInfo.getSshPort(),
accessInfo.getSshKey(),
+ nodeEntity.getHost(), nodeEntity.getAgentInstallDir(),
+ nodeEntity.getId(), nodeEntity.getAgentPort());
+
+ log.info("to stop agent of {} node {}", nodeEntity.getId(),
nodeEntity.getHost());
+ nodeAndAgentManager.deleteAgentOperation(nodeEntity,
uninstallConfig);
+ }
}
}
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index e84a5af..45a5253 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -23,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventResultType;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventType;
import
org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import
org.apache.doris.manager.common.heartbeat.config.AgentUnInstallEventConfigInfo;
import org.apache.doris.manager.common.heartbeat.stage.AgentInstallEventStage;
+import
org.apache.doris.manager.common.heartbeat.stage.AgentUnInstallEventStage;
import org.apache.doris.stack.constant.EnvironmentDefine;
import org.apache.doris.stack.dao.HeartBeatEventRepository;
import org.apache.doris.stack.dao.ResourceNodeRepository;
@@ -41,11 +43,14 @@ import org.springframework.stereotype.Component;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
public class ResourceNodeAndAgentManager {
private static final String AGENT_START_SCRIPT =
Constants.KEY_DORIS_AGENT_START_SCRIPT;
+ private static final String AGENT_STOP_SCRIPT =
Constants.KEY_DORIS_AGENT_STOP_SCRIPT;
private static final String AGENT_CONFIG_PATH =
Constants.KEY_DORIS_AGENT_CONFIG_PATH;
@Autowired
@@ -61,37 +66,44 @@ public class ResourceNodeAndAgentManager {
return newNodeEntity.getId();
}
- // TODO:Uninstall agent
public void deleteOperation(long resourceClusterId, String host) {
log.info("delete node {} for resource cluster {}", host,
resourceClusterId);
nodeRepository.deleteByResourceClusterIdAndHost(resourceClusterId,
host);
}
- public boolean isAvailableAgentPort(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo)
- throws Exception {
- // agent port check, eg: Spring Boot Param server.port=8008
- log.info("check {} node port {}:{}", node.getId(), node.getHost(),
node.getAgentPort());
- String sshkey = String.format("sshkey-%d-%d", node.getId(),
node.getAgentPort());
- File sshKeyFile = SSH.buildSshKeyFile(sshkey);
- SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+ public void deleteOperation(long nodeId) {
+ log.info("delete node {}", nodeId);
+ nodeRepository.deleteById(nodeId);
+ }
- // only check listen port
- String checkPortCmd = String.format("netstat -tunlp | grep -w %d",
node.getAgentPort());
- SSH checkPortSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
- sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
- if (checkPortSSH.run()) {
- String netInfo = checkPortSSH.getStdoutResponse();
- log.info("agent node {} port check return output\n: {}",
node.getId(), netInfo);
+ // only responsible to stop agent
+ public void deleteAgentOperation(ResourceNodeEntity node,
AgentUnInstallEventConfigInfo configInfo)
+ throws Exception {
- if (netInfo != null && !netInfo.trim().isEmpty()) {
- log.error("agent node {} port {} already in use:\n {}",
node.getId(), node.getAgentPort(), netInfo);
- return false;
- }
- } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep
failed, other exit code is exception
- log.warn("run check port cmd failed");
- throw new Exception("check agent port scrpit execution exception");
+ // check agent has been installed or not
+ if (!checkAgentOperation(node)) {
+ log.warn("node[{}]:{} does not install agent, no need to
uninstall", node.getId(), node.getHost());
+ return;
}
- return true;
+
+ HeartBeatEventEntity uninstallEvent = new
HeartBeatEventEntity(HeartBeatEventType.AGENT_STOP.name(),
+ HeartBeatEventResultType.INIT.name(),
JSON.toJSONString(configInfo), 0);
+
+ uninstallEvent = heartBeatEventRepository.save(uninstallEvent);
+
+ log.info("create event of deleting agent , event id {}",
uninstallEvent.getId());
+ node.setCurrentEventId(uninstallEvent.getId());
+ nodeRepository.save(node);
+
+ log.info("to stop node[{}] agent for resource cluster {}",
node.getHost(),
+ node.getResourceClusterId());
+
+ HeartBeatEventEntity finalUninstallEvent = uninstallEvent;
+ CompletableFuture<Void> uninstallFuture =
CompletableFuture.runAsync(() -> {
+ log.info("start to handle uninstall agent event on {} node {}",
node.getId(), node.getHost());
+ uninstallEventProcess(node, configInfo, finalUninstallEvent);
+ log.info("async uninstall agent on {} node {} success",
node.getId(), node.getHost());
+ });
}
public void installAgentOperation(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo, long requestId) {
@@ -139,6 +151,125 @@ public class ResourceNodeAndAgentManager {
agentInstallAgentEntity =
heartBeatEventRepository.save(eventEntity);
}
+ // handle install agent event async
+ CompletableFuture<Void> installFuture = CompletableFuture.runAsync(()
-> {
+ log.info("start to handle install agent event to {} node {}",
node.getId(), node.getHost());
+ installEventProcess(node, configInfo, agentInstallAgentEntity);
+ log.info("async install agent to {} node {} success",
node.getId(), node.getHost());
+ });
+ }
+
+ public boolean checkAgentOperation(ResourceNodeEntity node) {
+ long eventId = node.getCurrentEventId();
+
+ if (eventId < 1L) {
+ log.warn("The node no have agent");
+ return false;
+ } else {
+ HeartBeatEventEntity eventEntity =
heartBeatEventRepository.findById(eventId).get();
+ // If the agent has been successfully installed and a new agent
request operation has been performed,
+ // the installation cannot be performed again
+ if
(!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
+ return true;
+ }
+
+ if (eventEntity.isCompleted() &&
eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
+ return true;
+ }
+
+ log.warn("Agent has not been installed successfully");
+ return false;
+ }
+ }
+
+ public boolean isAvailableAgentPort(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo)
+ throws Exception {
+ // agent port check, eg: Spring Boot Param server.port=8008
+ log.info("check {} node port {}:{}", node.getId(), node.getHost(),
node.getAgentPort());
+ String sshkey = String.format("sshkey-%d-%d", node.getId(),
node.getAgentPort());
+ File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+ SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+ // only check listen port
+ String checkPortCmd = String.format("netstat -tunlp | grep -w %d",
node.getAgentPort());
+ SSH checkPortSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
+ if (checkPortSSH.run(3000)) {
+ String netInfo = checkPortSSH.getStdoutResponse();
+ log.info("agent node {} port check return output\n: {}",
node.getId(), netInfo);
+
+ if (netInfo != null && !netInfo.trim().isEmpty()) {
+ log.error("agent node {} port {} already in use:\n {}",
node.getId(), node.getAgentPort(), netInfo);
+ return false;
+ }
+ } else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep
failed, other exit code is exception
+ log.warn("run check port cmd failed");
+ throw new Exception("check agent port scrpit execution exception:"
+ checkPortSSH.getErrorResponse());
+ }
+ log.info("{} node {}:{} is available", node.getId(), node.getHost(),
node.getAgentPort());
+ return true;
+ }
+
+ public void checkSshConnect(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo)
+ throws Exception {
+
+ log.info("check {} node {} ssh connect", node.getId(), node.getHost());
+ String sshkey = String.format("sshkey-%d", node.getId());
+ File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+ SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+ SSH checkSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), node.getHost(), "echo ok");
+
+ // if ssh-key is invalid, ssh commond will enter interactive mode that
will block the process
+ // user not exists(255), user error(need input password), ssh port
error(1), host error(1)
+ if (checkSSH.run(3000)) {
+ log.info("ssh connect {} node {} success", node.getId(),
node.getHost());
+ } else {
+ throw new Exception(String.format("ssh connect node %s failed:
error code %d, error msg: %s",
+ node.getHost(), checkSSH.getExitCode(),
+ checkSSH.getErrorResponse()));
+ }
+ }
+
+ public void checkStopScriptExist(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo)
+ throws Exception {
+ log.info("check node {} agent stop script", node.getId());
+ checkPathExist(node, configInfo, Paths.get(node.getAgentInstallDir(),
"agent", AGENT_STOP_SCRIPT).toString());
+ }
+
+ public void checkPathExist(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo,
+ String path) throws Exception {
+
+ String sshkey = String.format("sshkey-%d", node.getId());
+ File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+ SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
+
+ String checkPathExistCmd = "if test -e " + path + "; then echo ok;
else exit 1; fi";
+ SSH checkPathSSH = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), node.getHost(),
checkPathExistCmd);
+ if (!checkPathSSH.run()) {
+ if (checkPathSSH.getExitCode() == 1) {
+ log.error("path {} is not exist on node {}", path,
node.getId());
+ throw new Exception(String.format("node %s does not have path:
%s", node.getHost(), path));
+ } else {
+ throw new Exception("check path exception:" +
checkPathSSH.getErrorResponse());
+ }
+ }
+ }
+
+ private void installEventProcess(ResourceNodeEntity node,
AgentInstallEventConfigInfo configInfo,
+ HeartBeatEventEntity
agentInstallAgentEntity) {
+ if
(!agentInstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name()))
{
+ log.warn("agent has been installed on {} node {}", node.getId(),
node.getHost());
+ return;
+ }
+
+ if
(!agentInstallAgentEntity.getStatus().equals(HeartBeatEventResultType.INIT.name()))
{
+ log.warn("agent is being installed on {} node {}", node.getId(),
node.getHost());
+ return;
+ }
+
// AGENT_INSTALL heartbeat handle
// TODO: Before each Stage operation, it is necessary to judge whether
the event has been cancelled
// ACCESS_AUTH stage
@@ -218,19 +349,6 @@ public class ResourceNodeAndAgentManager {
// agent start
// AGENT_START stage
- try {
- if (!isAvailableAgentPort(node, configInfo)) {
- log.error("port {}:{} already in use", configInfo.getHost(),
configInfo.getAgentPort());
- updateFailResult(String.format("agent port %s:%d already in
use",
- configInfo.getHost(),
configInfo.getAgentPort()),
- AgentInstallEventStage.AGENT_DEPLOY.getStage(),
agentInstallAgentEntity);
- return;
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("check agent port exception, skip port check");
- }
-
String agentInstallHome = configInfo.getInstallDir() + File.separator
+ "agent";
log.info("to start agent with port {}", configInfo.getAgentPort());
@@ -240,7 +358,7 @@ public class ResourceNodeAndAgentManager {
SSH startSsh = new SSH(configInfo.getSshUser(),
configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
if (!startSsh.run()) {
- log.error("agent start failed:{}", ssh.getErrorResponse());
+ log.error("agent start failed:{}", startSsh.getErrorResponse());
updateFailResult(AgentInstallEventStage.AGENT_START.getError(),
AgentInstallEventStage.AGENT_START.getStage(),
agentInstallAgentEntity);
return;
@@ -250,27 +368,46 @@ public class ResourceNodeAndAgentManager {
AgentInstallEventStage.AGENT_START.getStage(),
agentInstallAgentEntity);
}
- public boolean checkAgentOperation(ResourceNodeEntity node) {
- long eventId = node.getCurrentEventId();
+ private void uninstallEventProcess(ResourceNodeEntity node,
AgentUnInstallEventConfigInfo configInfo,
+ HeartBeatEventEntity
agentUninstallAgentEntity) {
+ if
(!agentUninstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_STOP.name()))
{
+ log.warn("agent no need to stop on {} node {}", node.getId(),
node.getHost());
+ return;
+ }
- if (eventId < 1L) {
- log.warn("The node no have agent");
- return false;
- } else {
- HeartBeatEventEntity eventEntity =
heartBeatEventRepository.findById(eventId).get();
- // If the agent has been successfully installed and a new agent
request operation has been performed,
- // the installation cannot be performed again
- if
(!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
- return true;
- }
+ if
(!agentUninstallAgentEntity.getStatus().equals(HeartBeatEventResultType.INIT.name()))
{
+ log.warn("agent is being stopped on {} node {}", node.getId(),
node.getHost());
+ return;
+ }
+ // warning: multithreaded write operation
+ String sshkey = String.format("sshkey-%d", node.getId());
+ File sshKeyFile = SSH.buildSshKeyFile(sshkey);
+ SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);
- if (eventEntity.isCompleted() &&
eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
- return true;
- }
+ String agentInstallHome = configInfo.getInstallDir() + File.separator
+ "agent";
- log.warn("Agent has not been installed successfully");
- return false;
+ log.info("to uninstall {} agent", configInfo.getHost());
+// String command = "cd %s && sh %s && rm -rf %s";
+// String cmd = String.format(command, agentInstallHome,
AGENT_STOP_SCRIPT, agentInstallHome);
+ String command = "cd %s && sh %s";
+ String cmd = String.format(command, agentInstallHome,
AGENT_STOP_SCRIPT);
+ SSH stopSsh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
+ sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
+ if (!stopSsh.run()) {
+ log.error("agent stop failed:{}", stopSsh.getErrorResponse());
+ agentUninstallAgentEntity.setCompleted(true);
+
agentUninstallAgentEntity.setStatus(HeartBeatEventResultType.FAIL.name());
+ heartBeatEventRepository.save(agentUninstallAgentEntity);
+
+ updateFailResult(AgentUnInstallEventStage.AGENT_STOP.getError() +
stopSsh.getErrorResponse(),
+ AgentUnInstallEventStage.AGENT_STOP.getStage(),
agentUninstallAgentEntity);
}
+
+ log.info("node {} success to uninstall agent", node.getHost());
+
+ agentUninstallAgentEntity.setCompleted(true);
+
agentUninstallAgentEntity.setStatus(HeartBeatEventResultType.SUCCESS.name());
+ heartBeatEventRepository.save(agentUninstallAgentEntity);
}
private void updateProcessingResult(String result, int stage,
HeartBeatEventEntity eventEntity) {
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
index 2164bed..dc941ab 100644
---
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
+++
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/BaseCommand.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -51,8 +52,12 @@ public abstract class BaseCommand {
}
public boolean run() {
+ return run(0);
+ }
+
+ public boolean run(long timeoutMs) {
buildCommand();
- log.info("run command: {}", StringUtils.join(resultCommand, " "));
+ log.info("run command: {} ,timeout time: {}ms",
StringUtils.join(resultCommand, " "), timeoutMs);
ProcessBuilder pb = new ProcessBuilder(resultCommand);
Process process = null;
BufferedReader stdoutBufferedReader = null;
@@ -65,7 +70,18 @@ public abstract class BaseCommand {
stdoutResponse =
stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
errorResponse =
errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
- exitCode = process.waitFor();
+ if (timeoutMs <= 0) {
+ exitCode = process.waitFor();
+ } else {
+ boolean isExit = process.waitFor(timeoutMs,
TimeUnit.MICROSECONDS);
+ if (!isExit) {
+ exitCode = 124; // the same as timeout command
+ log.error("command run timeout in {}ms", timeoutMs);
+ return false;
+ }
+ exitCode = process.exitValue();
+ }
+
if (exitCode == 0) {
return true;
} else {
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
index 456eec0..48a001e 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/shell/SSH.java
@@ -95,9 +95,11 @@ public class SSH extends BaseCommand {
protected void buildCommand() {
String[] command = new String[]{"ssh",
- "-o", "ConnectTimeOut=60",
+ "-o", "ConnectTimeOut=20",
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
+ // close password auth in case of blocking program when ssh
auth failed
+ "-o", "PasswordAuthentication=no",
"-i", this.sshKeyFile,
"-p", String.valueOf(this.sshPort),
this.user + "@" + this.host, this.command
diff --git
a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
index 896e1d0..0869a27 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/util/Constants.java
@@ -19,6 +19,8 @@ package org.apache.doris.stack.util;
public class Constants {
public static final String KEY_DORIS_AGENT_START_SCRIPT =
"bin/agent_start.sh";
+ public static final String KEY_DORIS_AGENT_STOP_SCRIPT =
"bin/agent_stop.sh";
+
public static final String KEY_DORIS_AGENT_CONFIG_PATH =
"config/application.properties";
public static final String KEY_FE_QUERY_PORT = "query_port";
public static final String KEY_FE_EDIT_LOG_PORT = "edit_log_port";
diff --git
a/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
b/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
index 4f8f3eb..85dcff7 100644
---
a/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
+++
b/manager/manager-server/src/main/java/org/apache/doris/stack/service/PaloUserSpaceService.java
@@ -42,7 +42,7 @@ public class PaloUserSpaceService extends BaseService {
@Autowired
private DorisManagerUserSpaceComponent userSpaceComponent;
- @Transactional
+ @Transactional(rollbackFor = Exception.class)
public long create(NewUserSpaceCreateReq createReq, CoreUserEntity user)
throws Exception {
return userSpaceComponent.create(createReq, user.getFirstName());
}
@@ -62,7 +62,7 @@ public class PaloUserSpaceService extends BaseService {
* @return
* @throws Exception
*/
- @Transactional
+ @Transactional(rollbackFor = Exception.class)
public NewUserSpaceInfo update(CoreUserEntity user, int spaceId,
NewUserSpaceCreateReq updateReq) throws Exception {
return userSpaceComponent.update(user, spaceId, updateReq);
}
@@ -93,7 +93,7 @@ public class PaloUserSpaceService extends BaseService {
* @param spaceId
* @throws Exception
*/
- @Transactional
+ @Transactional(rollbackFor = Exception.class)
public void deleteSpace(int spaceId) throws Exception {
userSpaceComponent.deleteSpace(spaceId);
}
diff --git
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
index b77486e..53c5ce7 100644
---
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
+++
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterInstanceRepository.java
@@ -21,8 +21,10 @@ import org.apache.doris.stack.entity.ClusterInstanceEntity;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
+import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@@ -46,6 +48,8 @@ public interface ClusterInstanceRepository extends
JpaRepository<ClusterInstance
void deleteById(Long id);
@Override
+ @Transactional
+ @Modifying
@CacheEvict(value = "cluster_instance", key = "#entity.nodeId")
void delete(ClusterInstanceEntity entity);
}
diff --git
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
index 81d8144..81fc551 100644
---
a/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
+++
b/manager/resource-common/src/main/java/org/apache/doris/stack/dao/ClusterModuleServiceRepository.java
@@ -22,6 +22,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
+import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@@ -34,6 +35,7 @@ public interface ClusterModuleServiceRepository extends
JpaRepository<ClusterMod
@Query("select c from ClusterModuleServiceEntity c where c.clusterId =
:clusterId")
List<ClusterModuleServiceEntity> getByClusterId(@Param("clusterId") long
clusterId);
+ @Transactional
@Modifying
@Query("delete from ClusterModuleServiceEntity c where c.moduleId =
:moduleId")
void deleteByModuleId(@Param("moduleId") long moduleId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]