This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bd95eebd85 [INLONG-11387][Manager] Support multi-threaded processing 
agent installation (#11388)
bd95eebd85 is described below

commit bd95eebd8548d5f2885b9b8b8fba818e0077cf7c
Author: fuweng11 <[email protected]>
AuthorDate: Tue Oct 22 21:01:24 2024 +0800

    [INLONG-11387][Manager] Support multi-threaded processing agent 
installation (#11388)
---
 .../mappers/InlongClusterNodeEntityMapper.xml      |  3 +
 .../service/cluster/InlongClusterServiceImpl.java  | 66 ++++++++++++++--------
 .../service/heartbeat/HeartbeatManager.java        |  9 +++
 3 files changed, 53 insertions(+), 25 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 7604362106..2b4be69d35 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -130,6 +130,9 @@
                 or port like CONCAT('%', #{keyword}, '%')
                 )
             </if>
+            <if test="status != null and status != ''">
+                and status = #{status, jdbcType=INTEGER}
+            </if>
         </where>
         order by modify_time desc
     </select>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 2b18ba582f..e2be48a4e0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -117,14 +117,19 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
 import static 
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
 import static 
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.unpackExtParams;
 
@@ -137,11 +142,11 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
     private static final Gson GSON = new Gson();
     private final ExecutorService executorService = new ThreadPoolExecutor(
-            5,
-            10,
-            10L,
-            TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(100),
+            CORE_POOL_SIZE,
+            MAX_POOL_SIZE,
+            ALIVE_TIME_MS,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(QUEUE_SIZE),
             new 
ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(),
             new CallerRunsPolicy());
     private final LinkedBlockingQueue<ClusterNodeRequest> 
pendingInstallRequests = new LinkedBlockingQueue<>();
@@ -179,11 +184,18 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
 
     @PostConstruct
     private void startInstallTask() {
-        InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable();
-        this.executorService.execute(installTaskRunnable);
+        processInstall();
+        setReloadTimer();
         LOGGER.info("install task started successfully");
     }
 
+    private void setReloadTimer() {
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+        long reloadInterval = 60000L;
+        executorService.scheduleWithFixedDelay(this::processInstall, 
reloadInterval, reloadInterval,
+                TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public Integer saveTag(ClusterTagRequest request, String operator) {
         LOGGER.debug("begin to save cluster tag {}", request);
@@ -731,6 +743,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         Integer id = instance.saveOpt(request, operator);
         if (request.getIsInstall()) {
             request.setId(id);
+            clusterNodeMapper.updateOperateLogById(id, 
NodeStatus.INSTALLING.getStatus(), "begin to install");
             pendingInstallRequests.add(request);
         }
         return id;
@@ -881,6 +894,8 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         if (request.getIsInstall()) {
             // when reinstall set install to false
             request.setIsInstall(false);
+            clusterNodeMapper.updateOperateLogById(request.getId(), 
NodeStatus.INSTALLING.getStatus(),
+                    "begin to re install");
             pendingInstallRequests.add(request);
         }
         return true;
@@ -1434,34 +1449,35 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
 
     private class InstallTaskRunnable implements Runnable {
 
-        private static final int WAIT_SECONDS = 60 * 1000;
+        private ClusterNodeRequest request;
 
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    processInstall();
-                    Thread.sleep(WAIT_SECONDS);
-                } catch (Exception e) {
-                    LOGGER.error("exception occurred when install", e);
-                }
-            }
+        public InstallTaskRunnable(ClusterNodeRequest request) {
+            this.request = request;
         }
 
-        @Transactional(rollbackFor = Throwable.class)
-        public void processInstall() {
-            if (pendingInstallRequests.isEmpty()) {
+        @Override
+        public void run() {
+            if (request == null) {
                 return;
             }
-            ClusterNodeRequest request = pendingInstallRequests.poll();
-            InlongClusterNodeInstallOperator clusterNodeInstallOperator = 
clusterNodeInstallOperatorFactory.getInstance(
-                    request.getType());
+            InlongClusterNodeInstallOperator clusterNodeInstallOperator =
+                    
clusterNodeInstallOperatorFactory.getInstance(request.getType());
             if (request.getIsInstall()) {
                 clusterNodeInstallOperator.install(request, 
request.getCurrentUser());
             } else {
                 clusterNodeInstallOperator.reInstall(request, 
request.getCurrentUser());
             }
+        }
+    }
 
+    @Transactional(rollbackFor = Throwable.class)
+    public void processInstall() {
+        LOGGER.info("begin to process install task");
+        while (!pendingInstallRequests.isEmpty()) {
+            ClusterNodeRequest request = pendingInstallRequests.poll();
+            InstallTaskRunnable installTaskRunnable = new 
InstallTaskRunnable(request);
+            executorService.execute(installTaskRunnable);
         }
+        LOGGER.info("success to process install task");
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index e8add054d9..fc51b92cdf 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.heartbeat;
 
+import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.AddressInfo;
@@ -211,6 +212,10 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
             } else {
                 heartbeatMsg.setProtocolType(protocolType);
             }
+            if (Objects.equals(heartbeat.getComponentType(), 
ComponentTypeEnum.Agent.getType())) {
+                heartbeatMsg.setProtocolType(null);
+                heartbeatMsg.setPort(null);
+            }
             // uninstall node event
             if 
(NodeSrvStatus.SERVICE_UNINSTALL.equals(heartbeat.getNodeSrvStatus())) {
                 InlongClusterNodeEntity clusterNode = 
getClusterNode(clusterInfo, heartbeatMsg);
@@ -296,6 +301,10 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
             } else {
                 heartbeatMsg.setProtocolType(protocolType);
             }
+            if (Objects.equals(heartbeat.getComponentType(), 
ComponentTypeEnum.Agent.getType())) {
+                heartbeatMsg.setProtocolType(null);
+                heartbeatMsg.setPort(null);
+            }
             InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, 
heartbeatMsg);
             if (clusterNode == null) {
                 log.error("not found any cluster node by type={}, ip={}, 
port={}",

Reply via email to