This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f112ec05cf [INLONG-10558][Manager] Support determining whether to 
issue agent tasks based on the MD5 value (#10559)
f112ec05cf is described below

commit f112ec05cf4045fed46de122e4b8756aec32247d
Author: fuweng11 <[email protected]>
AuthorDate: Thu Jul 4 19:28:54 2024 +0800

    [INLONG-10558][Manager] Support determining whether to issue agent tasks 
based on the MD5 value (#10559)
    
    * [INLONG-10558][Manager] Support determining whether to issue agent tasks 
based on the MD5 value
---
 .../inlong/agent/installer/ManagerFetcher.java     |   4 +-
 .../inlong/common/pojo/agent/AgentConfigInfo.java  |   6 +-
 .../common/pojo/agent/AgentConfigRequest.java      |   3 +
 .../InstallerCode.java => AgentResponseCode.java}  |  14 +--
 .../inlong/common/pojo/agent/TaskRequest.java      |   2 +
 .../inlong/common/pojo/agent/TaskResult.java       |   2 +
 .../common/pojo/agent/installer/ConfigResult.java  |   4 +-
 .../service/core/impl/AgentServiceImpl.java        | 126 +++++++++++++--------
 8 files changed, 105 insertions(+), 56 deletions(-)

diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
index 68189dc6d1..222d2e9d83 100644
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
@@ -23,9 +23,9 @@ import 
org.apache.inlong.agent.installer.conf.InstallerConfiguration;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
 import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
 import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
-import org.apache.inlong.common.pojo.agent.installer.InstallerCode;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -124,7 +124,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             while (isRunnable()) {
                 try {
                     ConfigResult config = getConfig();
-                    if (config != null && 
config.getCode().equals(InstallerCode.SUCCESS)) {
+                    if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)) {
                         manager.getModuleManager().submitConfig(config);
                     }
                 } catch (Throwable ex) {
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
index 7d5f8ce4df..2399c9657c 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
@@ -26,11 +26,15 @@ import lombok.NoArgsConstructor;
  * The Agent config info.
  */
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class AgentConfigInfo {
 
+    AgentResponseCode code;
     private String zkUrl;
-
     private AgentClusterInfo cluster;
+    private String md5;
 
     @Data
     @Builder
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
index 857583bc86..282f1acf82 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
@@ -34,4 +34,7 @@ public class AgentConfigRequest {
     private String clusterName;
 
     private String ip;
+
+    private String md5;
+
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
similarity index 78%
rename from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
index 60945e4894..826adee2b1 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.agent.installer;
+package org.apache.inlong.common.pojo.agent;
 
-public enum InstallerCode {
+public enum AgentResponseCode {
 
     SUCCESS(0, "SUCCESS", "Get module config success"),
     NO_UPDATE(1, "NO_UPDATE", "No update"),
@@ -27,16 +27,16 @@ public enum InstallerCode {
     private final String name;
     private final String desc;
 
-    InstallerCode(int id, String name, String desc) {
+    AgentResponseCode(int id, String name, String desc) {
         this.id = id;
         this.name = name;
         this.desc = desc;
     }
 
-    public static InstallerCode valueOf(int value) {
-        for (InstallerCode installerCode : InstallerCode.values()) {
-            if (installerCode.getId() == value) {
-                return installerCode;
+    public static AgentResponseCode valueOf(int value) {
+        for (AgentResponseCode agentResponseCode : AgentResponseCode.values()) 
{
+            if (agentResponseCode.getId() == value) {
+                return agentResponseCode;
             }
         }
         return UNKNOWN_ERROR;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
index 7a98f2ac59..479e89a86d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
@@ -38,6 +38,8 @@ public class TaskRequest {
 
     private int pullJobType;
 
+    private String md5;
+
     private List<CommandEntity> commandInfo = new ArrayList<>();
 
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index 878a65dd12..2fcbec919d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -35,5 +35,7 @@ public class TaskResult {
 
     private List<CmdConfig> cmdConfigs;
     private List<DataConfig> dataConfigs;
+    private String md5;
+    AgentResponseCode code;
 
 }
\ No newline at end of file
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
index 14e504d211..b2c5426a81 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.common.pojo.agent.installer;
 
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -36,7 +38,7 @@ public class ConfigResult {
     /**
      * The code of the config result
      */
-    InstallerCode code;
+    AgentResponseCode code;
 
     /**
      * The md5 of the config result
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index edb36d884f..37a19beb14 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
 import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -32,7 +33,6 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
 import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
-import org.apache.inlong.common.pojo.agent.installer.InstallerCode;
 import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
 import org.apache.inlong.common.pojo.agent.installer.PackageConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
@@ -145,7 +145,10 @@ public class AgentServiceImpl implements AgentService {
             new CallerRunsPolicy());
 
     @Getter
-    private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;
+    private LoadingCache<TaskRequest, TaskResult> taskCache;
+    @Getter
+    private LoadingCache<AgentConfigRequest, AgentConfigInfo> agentConfigCache;
+
     @Getter
     private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;
 
@@ -201,6 +204,9 @@ public class AgentServiceImpl implements AgentService {
         taskCache = Caffeine.newBuilder()
                 .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
                 .build(this::fetchTask);
+        agentConfigCache = Caffeine.newBuilder()
+                .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
+                .build(this::fetchAgentConfig);
         LOGGER.debug("start to reload config for installer.");
         try {
             moduleConfigCache = Caffeine.newBuilder()
@@ -331,28 +337,18 @@ public class AgentServiceImpl implements AgentService {
     @Override
     public AgentConfigInfo getAgentConfig(AgentConfigRequest request) {
         LOGGER.debug("begin to get agent config info for {}", request);
-        AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
-        Set<String> tagSet = new HashSet<>(16);
-        
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
-        List<String> clusterTagList = new ArrayList<>(tagSet);
-        ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                .type(ClusterType.AGENT_ZK)
-                .clusterTagList(clusterTagList)
-                .build();
-        List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
-        if (CollectionUtils.isNotEmpty(agentZkCluster)) {
-            agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+        AgentConfigInfo agentConfigInfo = agentConfigCache.get(request);
+        if (agentConfigInfo == null) {
+            return null;
+        }
+        if (request.getMd5() == null || !Objects.equals(request.getMd5(), 
agentConfigInfo.getMd5())) {
+            return agentConfigInfo;
         }
-
-        AgentClusterInfo clusterInfo = (AgentClusterInfo) 
clusterService.getOne(
-                null, request.getClusterName(), ClusterType.AGENT);
-        agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
-                .parentId(clusterInfo.getId())
-                .clusterName(clusterInfo.getName())
-                .build());
-
         LOGGER.debug("success to get agent config info for: {}, result: {}", 
request, agentConfigInfo);
-        return agentConfigInfo;
+        return AgentConfigInfo.builder()
+                .md5(agentConfigInfo.getMd5())
+                .code(AgentResponseCode.NO_UPDATE)
+                .build();
     }
 
     @Override
@@ -374,27 +370,19 @@ public class AgentServiceImpl implements AgentService {
     @Override
     public TaskResult getExistTaskConfig(TaskRequest request) {
         LOGGER.debug("begin to get all exist task by request={}", request);
-        // Query pending special commands
-        List<DataConfig> runningTaskConfig = Lists.newArrayList();
-        List<StreamSourceEntity> sourceEntities = taskCache.get(request);
-        try {
-            List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
-            if (CollectionUtils.isEmpty(sourceEntities)) {
-                return 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
-            }
-            for (StreamSourceEntity sourceEntity : sourceEntities) {
-                int op = getOp(sourceEntity.getStatus());
-                DataConfig dataConfig = getDataConfig(sourceEntity, op);
-                runningTaskConfig.add(dataConfig);
-            }
-            TaskResult taskResult = 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
-
+        TaskResult taskResult = taskCache.get(request);
+        if (taskResult == null) {
+            return null;
+        }
+        if (request.getMd5() == null || !Objects.equals(request.getMd5(), 
taskResult.getMd5())) {
             return taskResult;
-        } catch (Exception e) {
-            LOGGER.error("get all exist task failed:", e);
-            throw new BusinessException("get all exist task failed:" + 
e.getMessage());
         }
-
+        return TaskResult.builder()
+                .dataConfigs(new ArrayList<>())
+                .cmdConfigs(new ArrayList<>())
+                .md5(taskResult.getMd5())
+                .code(AgentResponseCode.NO_UPDATE)
+                .build();
     }
 
     @Override
@@ -474,7 +462,7 @@ public class AgentServiceImpl implements AgentService {
         if (Objects.equals(request.getMd5(), configResult.getMd5())) {
             return ConfigResult.builder()
                     .md5(configResult.getMd5())
-                    .code(InstallerCode.NO_UPDATE)
+                    .code(AgentResponseCode.NO_UPDATE)
                     .build();
         }
         return configResult;
@@ -837,7 +825,7 @@ public class AgentServiceImpl implements AgentService {
         return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
-    private List<StreamSourceEntity> fetchTask(TaskRequest request) {
+    private TaskResult fetchTask(TaskRequest request) {
         final String clusterName = request.getClusterName();
         final String ip = request.getAgentIp();
         final String uuid = request.getUuid();
@@ -850,7 +838,55 @@ public class AgentServiceImpl implements AgentService {
                 clusterName, ip, uuid);
         taskLists.addAll(stopSourceEntities);
         LOGGER.debug("success to add task : {}", taskLists.size());
-        return taskLists;
+        List<DataConfig> runningTaskConfig = Lists.newArrayList();
+        try {
+            List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+            if (CollectionUtils.isEmpty(taskLists)) {
+                return 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+            }
+            for (StreamSourceEntity sourceEntity : taskLists) {
+                int op = getOp(sourceEntity.getStatus());
+                DataConfig dataConfig = getDataConfig(sourceEntity, op);
+                runningTaskConfig.add(dataConfig);
+            }
+            TaskResult taskResult = 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+            String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
+            taskResult.setMd5(md5);
+            taskResult.setCode(AgentResponseCode.SUCCESS);
+            return taskResult;
+        } catch (Exception e) {
+            LOGGER.error("get all exist task failed:", e);
+            throw new BusinessException("get all exist task failed:" + 
e.getMessage());
+        }
+    }
+
+    private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) {
+        LOGGER.debug("begin to get agent config info for {}", request);
+        AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
+        Set<String> tagSet = new HashSet<>(16);
+        
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
+        List<String> clusterTagList = new ArrayList<>(tagSet);
+        ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                .type(ClusterType.AGENT_ZK)
+                .clusterTagList(clusterTagList)
+                .build();
+        List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
+        if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+            agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+        }
+
+        AgentClusterInfo clusterInfo = (AgentClusterInfo) 
clusterService.getOne(
+                null, request.getClusterName(), ClusterType.AGENT);
+        agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
+                .parentId(clusterInfo.getId())
+                .clusterName(clusterInfo.getName())
+                .build());
+        String jsonStr = GSON.toJson(agentConfigInfo);
+        String configMd5 = DigestUtils.md5Hex(jsonStr);
+        agentConfigInfo.setMd5(configMd5);
+        agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+        LOGGER.debug("success to get agent config info for: {}, result: {}", 
request, agentConfigInfo);
+        return agentConfigInfo;
     }
 
     private ConfigResult loadModuleConfigs(ConfigRequest request) {
@@ -870,7 +906,7 @@ public class AgentServiceImpl implements AgentService {
         String configMd5 = DigestUtils.md5Hex(jsonStr);
 
         ConfigResult configResult = 
ConfigResult.builder().moduleList(configs).md5(configMd5)
-                .code(InstallerCode.SUCCESS)
+                .code(AgentResponseCode.SUCCESS)
                 .build();
         LOGGER.info("success load module config, size = {}", 
configResult.getModuleList().size());
         return configResult;

Reply via email to