This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f112ec05cf [INLONG-10558][Manager] Support determining whether to
issue agent tasks based on the MD5 value (#10559)
f112ec05cf is described below
commit f112ec05cf4045fed46de122e4b8756aec32247d
Author: fuweng11 <[email protected]>
AuthorDate: Thu Jul 4 19:28:54 2024 +0800
[INLONG-10558][Manager] Support determining whether to issue agent tasks
based on the MD5 value (#10559)
* [INLONG-10558][Manager] Support determining whether to issue agent tasks
based on the MD5 value
---
.../inlong/agent/installer/ManagerFetcher.java | 4 +-
.../inlong/common/pojo/agent/AgentConfigInfo.java | 6 +-
.../common/pojo/agent/AgentConfigRequest.java | 3 +
.../InstallerCode.java => AgentResponseCode.java} | 14 +--
.../inlong/common/pojo/agent/TaskRequest.java | 2 +
.../inlong/common/pojo/agent/TaskResult.java | 2 +
.../common/pojo/agent/installer/ConfigResult.java | 4 +-
.../service/core/impl/AgentServiceImpl.java | 126 +++++++++++++--------
8 files changed, 105 insertions(+), 56 deletions(-)
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
index 68189dc6d1..222d2e9d83 100644
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
@@ -23,9 +23,9 @@ import
org.apache.inlong.agent.installer.conf.InstallerConfiguration;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
-import org.apache.inlong.common.pojo.agent.installer.InstallerCode;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -124,7 +124,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
while (isRunnable()) {
try {
ConfigResult config = getConfig();
- if (config != null &&
config.getCode().equals(InstallerCode.SUCCESS)) {
+ if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)) {
manager.getModuleManager().submitConfig(config);
}
} catch (Throwable ex) {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
index 7d5f8ce4df..2399c9657c 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
@@ -26,11 +26,15 @@ import lombok.NoArgsConstructor;
* The Agent config info.
*/
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class AgentConfigInfo {
+ AgentResponseCode code;
private String zkUrl;
-
private AgentClusterInfo cluster;
+ private String md5;
@Data
@Builder
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
index 857583bc86..282f1acf82 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
@@ -34,4 +34,7 @@ public class AgentConfigRequest {
private String clusterName;
private String ip;
+
+ private String md5;
+
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
similarity index 78%
rename from
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
index 60945e4894..826adee2b1 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/InstallerCode.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentResponseCode.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.agent.installer;
+package org.apache.inlong.common.pojo.agent;
-public enum InstallerCode {
+public enum AgentResponseCode {
SUCCESS(0, "SUCCESS", "Get module config success"),
NO_UPDATE(1, "NO_UPDATE", "No update"),
@@ -27,16 +27,16 @@ public enum InstallerCode {
private final String name;
private final String desc;
- InstallerCode(int id, String name, String desc) {
+ AgentResponseCode(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
}
- public static InstallerCode valueOf(int value) {
- for (InstallerCode installerCode : InstallerCode.values()) {
- if (installerCode.getId() == value) {
- return installerCode;
+ public static AgentResponseCode valueOf(int value) {
+ for (AgentResponseCode agentResponseCode : AgentResponseCode.values())
{
+ if (agentResponseCode.getId() == value) {
+ return agentResponseCode;
}
}
return UNKNOWN_ERROR;
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
index 7a98f2ac59..479e89a86d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
@@ -38,6 +38,8 @@ public class TaskRequest {
private int pullJobType;
+ private String md5;
+
private List<CommandEntity> commandInfo = new ArrayList<>();
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index 878a65dd12..2fcbec919d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -35,5 +35,7 @@ public class TaskResult {
private List<CmdConfig> cmdConfigs;
private List<DataConfig> dataConfigs;
+ private String md5;
+ AgentResponseCode code;
}
\ No newline at end of file
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
index 14e504d211..b2c5426a81 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
@@ -17,6 +17,8 @@
package org.apache.inlong.common.pojo.agent.installer;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -36,7 +38,7 @@ public class ConfigResult {
/**
* The code of the config result
*/
- InstallerCode code;
+ AgentResponseCode code;
/**
* The md5 of the config result
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index edb36d884f..37a19beb14 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -32,7 +33,6 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
-import org.apache.inlong.common.pojo.agent.installer.InstallerCode;
import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
import org.apache.inlong.common.pojo.agent.installer.PackageConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
@@ -145,7 +145,10 @@ public class AgentServiceImpl implements AgentService {
new CallerRunsPolicy());
@Getter
- private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;
+ private LoadingCache<TaskRequest, TaskResult> taskCache;
+ @Getter
+ private LoadingCache<AgentConfigRequest, AgentConfigInfo> agentConfigCache;
+
@Getter
private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;
@@ -201,6 +204,9 @@ public class AgentServiceImpl implements AgentService {
taskCache = Caffeine.newBuilder()
.expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);
+ agentConfigCache = Caffeine.newBuilder()
+ .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
+ .build(this::fetchAgentConfig);
LOGGER.debug("start to reload config for installer.");
try {
moduleConfigCache = Caffeine.newBuilder()
@@ -331,28 +337,18 @@ public class AgentServiceImpl implements AgentService {
@Override
public AgentConfigInfo getAgentConfig(AgentConfigRequest request) {
LOGGER.debug("begin to get agent config info for {}", request);
- AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
- Set<String> tagSet = new HashSet<>(16);
-
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
- List<String> clusterTagList = new ArrayList<>(tagSet);
- ClusterPageRequest pageRequest = ClusterPageRequest.builder()
- .type(ClusterType.AGENT_ZK)
- .clusterTagList(clusterTagList)
- .build();
- List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
- if (CollectionUtils.isNotEmpty(agentZkCluster)) {
- agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+ AgentConfigInfo agentConfigInfo = agentConfigCache.get(request);
+ if (agentConfigInfo == null) {
+ return null;
+ }
+ if (request.getMd5() == null || !Objects.equals(request.getMd5(),
agentConfigInfo.getMd5())) {
+ return agentConfigInfo;
}
-
- AgentClusterInfo clusterInfo = (AgentClusterInfo)
clusterService.getOne(
- null, request.getClusterName(), ClusterType.AGENT);
- agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
- .parentId(clusterInfo.getId())
- .clusterName(clusterInfo.getName())
- .build());
-
LOGGER.debug("success to get agent config info for: {}, result: {}",
request, agentConfigInfo);
- return agentConfigInfo;
+ return AgentConfigInfo.builder()
+ .md5(agentConfigInfo.getMd5())
+ .code(AgentResponseCode.NO_UPDATE)
+ .build();
}
@Override
@@ -374,27 +370,19 @@ public class AgentServiceImpl implements AgentService {
@Override
public TaskResult getExistTaskConfig(TaskRequest request) {
LOGGER.debug("begin to get all exist task by request={}", request);
- // Query pending special commands
- List<DataConfig> runningTaskConfig = Lists.newArrayList();
- List<StreamSourceEntity> sourceEntities = taskCache.get(request);
- try {
- List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
- if (CollectionUtils.isEmpty(sourceEntities)) {
- return
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
- }
- for (StreamSourceEntity sourceEntity : sourceEntities) {
- int op = getOp(sourceEntity.getStatus());
- DataConfig dataConfig = getDataConfig(sourceEntity, op);
- runningTaskConfig.add(dataConfig);
- }
- TaskResult taskResult =
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
-
+ TaskResult taskResult = taskCache.get(request);
+ if (taskResult == null) {
+ return null;
+ }
+ if (request.getMd5() == null || !Objects.equals(request.getMd5(),
taskResult.getMd5())) {
return taskResult;
- } catch (Exception e) {
- LOGGER.error("get all exist task failed:", e);
- throw new BusinessException("get all exist task failed:" +
e.getMessage());
}
-
+ return TaskResult.builder()
+ .dataConfigs(new ArrayList<>())
+ .cmdConfigs(new ArrayList<>())
+ .md5(taskResult.getMd5())
+ .code(AgentResponseCode.NO_UPDATE)
+ .build();
}
@Override
@@ -474,7 +462,7 @@ public class AgentServiceImpl implements AgentService {
if (Objects.equals(request.getMd5(), configResult.getMd5())) {
return ConfigResult.builder()
.md5(configResult.getMd5())
- .code(InstallerCode.NO_UPDATE)
+ .code(AgentResponseCode.NO_UPDATE)
.build();
}
return configResult;
@@ -837,7 +825,7 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
- private List<StreamSourceEntity> fetchTask(TaskRequest request) {
+ private TaskResult fetchTask(TaskRequest request) {
final String clusterName = request.getClusterName();
final String ip = request.getAgentIp();
final String uuid = request.getUuid();
@@ -850,7 +838,55 @@ public class AgentServiceImpl implements AgentService {
clusterName, ip, uuid);
taskLists.addAll(stopSourceEntities);
LOGGER.debug("success to add task : {}", taskLists.size());
- return taskLists;
+ List<DataConfig> runningTaskConfig = Lists.newArrayList();
+ try {
+ List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+ if (CollectionUtils.isEmpty(taskLists)) {
+ return
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+ }
+ for (StreamSourceEntity sourceEntity : taskLists) {
+ int op = getOp(sourceEntity.getStatus());
+ DataConfig dataConfig = getDataConfig(sourceEntity, op);
+ runningTaskConfig.add(dataConfig);
+ }
+ TaskResult taskResult =
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+ String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
+ taskResult.setMd5(md5);
+ taskResult.setCode(AgentResponseCode.SUCCESS);
+ return taskResult;
+ } catch (Exception e) {
+ LOGGER.error("get all exist task failed:", e);
+ throw new BusinessException("get all exist task failed:" +
e.getMessage());
+ }
+ }
+
+ private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) {
+ LOGGER.debug("begin to get agent config info for {}", request);
+ AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
+ Set<String> tagSet = new HashSet<>(16);
+
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
+ List<String> clusterTagList = new ArrayList<>(tagSet);
+ ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+ .type(ClusterType.AGENT_ZK)
+ .clusterTagList(clusterTagList)
+ .build();
+ List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
+ if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+ agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+ }
+
+ AgentClusterInfo clusterInfo = (AgentClusterInfo)
clusterService.getOne(
+ null, request.getClusterName(), ClusterType.AGENT);
+ agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
+ .parentId(clusterInfo.getId())
+ .clusterName(clusterInfo.getName())
+ .build());
+ String jsonStr = GSON.toJson(agentConfigInfo);
+ String configMd5 = DigestUtils.md5Hex(jsonStr);
+ agentConfigInfo.setMd5(configMd5);
+ agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+ LOGGER.debug("success to get agent config info for: {}, result: {}",
request, agentConfigInfo);
+ return agentConfigInfo;
}
private ConfigResult loadModuleConfigs(ConfigRequest request) {
@@ -870,7 +906,7 @@ public class AgentServiceImpl implements AgentService {
String configMd5 = DigestUtils.md5Hex(jsonStr);
ConfigResult configResult =
ConfigResult.builder().moduleList(configs).md5(configMd5)
- .code(InstallerCode.SUCCESS)
+ .code(AgentResponseCode.SUCCESS)
.build();
LOGGER.info("success load module config, size = {}",
configResult.getModuleList().size());
return configResult;