This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9969932589 [INLONG-11157][Manager] Asynchronous processing agent
installation (#11159)
9969932589 is described below
commit 9969932589fb92ba26a1f1897a124b6dda476993
Author: fuweng11 <[email protected]>
AuthorDate: Mon Sep 23 14:06:27 2024 +0800
[INLONG-11157][Manager] Asynchronous processing agent installation (#11159)
---
.../inlong/manager/common/enums/NodeStatus.java | 10 +++-
.../dao/mapper/InlongClusterNodeEntityMapper.java | 3 +-
.../mappers/InlongClusterNodeEntityMapper.xml | 9 ++-
.../service/cluster/InlongClusterServiceImpl.java | 69 +++++++++++++++++++---
.../node/AgentClusterNodeInstallOperator.java | 18 +++++-
5 files changed, 96 insertions(+), 13 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
index b240c7217a..7ee92cc537 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java
@@ -26,7 +26,15 @@ public enum NodeStatus {
NORMAL(1),
- HEARTBEAT_TIMEOUT(2);
+ HEARTBEAT_TIMEOUT(2),
+
+ INSTALLING(3),
+
+ INSTALL_FAILED(4),
+
+ INSTALL_SUCCESS(5),
+
+ UNLOAD_FAILED(6);
int status;
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index f168de98d9..2b87842e99 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -55,7 +55,8 @@ public interface InlongClusterNodeEntityMapper {
*/
int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer
nextStatus, @Param("status") Integer status);
- int updateOperateLogById(@Param("id") Integer id, @Param("operateLog")
String operateLog);
+ int updateOperateLogById(@Param("id") Integer id, @Param("nextStatus")
Integer nextStatus,
+ @Param("operateLog") String operateLog);
int deleteById(Integer id);
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 2afe00ee6b..7604362106 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -256,7 +256,14 @@
</update>
<update id="updateOperateLogById">
update inlong_cluster_node
- set operate_log = #{operateLog,jdbcType=LONGVARCHAR}
+ <set>
+ <if test="nextStatus != null">
+ status = #{nextStatus,jdbcType=INTEGER},
+ </if>
+ <if test="operateLog != null">
+ operate_log = #{operateLog,jdbcType=LONGVARCHAR}
+ </if>
+ </set>
where id = #{id,jdbcType=INTEGER}
and is_deleted = 0
</update>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index f399455803..7882e29c12 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -90,6 +90,7 @@ import com.github.pagehelper.PageHelper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -101,6 +102,8 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
+import javax.annotation.PostConstruct;
+
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -113,6 +116,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
@@ -126,6 +135,15 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final Gson GSON = new Gson();
+ private final ExecutorService executorService = new ThreadPoolExecutor(
+ 5,
+ 10,
+ 10L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(100),
+ new
ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(),
+ new CallerRunsPolicy());
+ private final LinkedBlockingQueue<ClusterNodeRequest>
pendingInstallRequests = new LinkedBlockingQueue<>();
@Autowired
private InlongGroupEntityMapper groupMapper;
@@ -158,6 +176,13 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
@Autowired
private DataProxyConfigRepository proxyRepository;
+ @PostConstruct
+ private void startInstallTask() {
+ InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable();
+ this.executorService.execute(installTaskRunnable);
+ LOGGER.info("install task started successfully");
+ }
+
@Override
public Integer saveTag(ClusterTagRequest request, String operator) {
LOGGER.debug("begin to save cluster tag {}", request);
@@ -692,9 +717,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
Integer id = instance.saveOpt(request, operator);
if (request.getIsInstall()) {
request.setId(id);
- InlongClusterNodeInstallOperator clusterNodeInstallOperator =
clusterNodeInstallOperatorFactory.getInstance(
- request.getType());
- clusterNodeInstallOperator.install(request, operator);
+ pendingInstallRequests.add(request);
}
return id;
}
@@ -810,7 +833,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
public Boolean updateNode(ClusterNodeRequest request, String operator) {
LOGGER.debug("begin to update inlong cluster node={}", request);
Preconditions.expectNotNull(request, "inlong cluster node cannot be
empty");
@@ -843,9 +865,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
instance.updateOpt(request, operator);
if (request.getIsInstall()) {
- InlongClusterNodeInstallOperator clusterNodeInstallOperator =
clusterNodeInstallOperatorFactory.getInstance(
- request.getType());
- clusterNodeInstallOperator.install(request, operator);
+ // when reinstall set install to false
+ request.setIsInstall(false);
+ pendingInstallRequests.add(request);
}
return true;
}
@@ -1381,4 +1403,37 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
request.setClusterTags(entity.getClusterTags());
}
}
+
+ private class InstallTaskRunnable implements Runnable {
+
+ private static final int WAIT_SECONDS = 60 * 1000;
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ processInstall();
+ Thread.sleep(WAIT_SECONDS);
+ } catch (Exception e) {
+ LOGGER.error("exception occurred when install", e);
+ }
+ }
+ }
+
+ @Transactional(rollbackFor = Throwable.class)
+ public void processInstall() {
+ if (pendingInstallRequests.isEmpty()) {
+ return;
+ }
+ ClusterNodeRequest request = pendingInstallRequests.poll();
+ InlongClusterNodeInstallOperator clusterNodeInstallOperator =
clusterNodeInstallOperatorFactory.getInstance(
+ request.getType());
+ if (request.getIsInstall()) {
+ clusterNodeInstallOperator.install(request,
request.getCurrentUser());
+ } else {
+ clusterNodeInstallOperator.reInstall(request,
request.getCurrentUser());
+ }
+
+ }
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
index 06fd1db3ad..abf8a895cd 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.cluster.node;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ModuleType;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -96,12 +97,17 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
public boolean install(ClusterNodeRequest clusterNodeRequest, String
operator) {
LOGGER.info("begin to insert agent cluster node={}",
clusterNodeRequest);
try {
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALLING.getStatus(),
+ "begin to install");
AgentClusterNodeRequest request = (AgentClusterNodeRequest)
clusterNodeRequest;
deployInstaller(request, operator);
String startCmd = agentInstallPath + INSTALLER_START_CMD;
commandExecutor.execRemote(request, startCmd);
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+ NodeStatus.INSTALL_SUCCESS.getStatus(), "success to
install");
} catch (Exception e) {
-
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
e.getMessage());
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+ NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("install agent cluster node failed
for ip=%s", clusterNodeRequest.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
@@ -114,13 +120,18 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String
operator) {
LOGGER.info("begin to reInstall agent cluster node={}",
clusterNodeRequest);
try {
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALLING.getStatus(),
+ "begin to reinstall");
AgentClusterNodeRequest request = (AgentClusterNodeRequest)
clusterNodeRequest;
commandExecutor.rmDir(request, agentInstallPath.substring(0,
agentInstallPath.lastIndexOf(File.separator)));
deployInstaller(request, operator);
String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
commandExecutor.execRemote(request, reStartCmd);
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.NORMAL.getStatus(),
+ "success to reinstall");
} catch (Exception e) {
-
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
e.getMessage());
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
+ NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("reInstall agent cluster node failed
for ip=%s", clusterNodeRequest.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
@@ -140,7 +151,8 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
commandExecutor.execRemote(request, stopCmd);
commandExecutor.rmDir(request, agentInstallPath.substring(0,
agentInstallPath.lastIndexOf(File.separator)));
} catch (Exception e) {
-
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(),
e.getMessage());
+
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(),
+ NodeStatus.UNLOAD_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("unload agent cluster node failed
for ip=%s", clusterNodeEntity.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);