This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e4d5007fc5 [INLONG-10601][Manager] Optimize the agent task
configuration process (#10602)
e4d5007fc5 is described below
commit e4d5007fc56240f41f9233b440c6cde75942c69f
Author: fuweng11 <[email protected]>
AuthorDate: Thu Jul 11 19:19:19 2024 +0800
[INLONG-10601][Manager] Optimize the agent task configuration process
(#10602)
---
.../inlong/common/pojo/agent/AgentConfigInfo.java | 1 +
.../inlong/common/pojo/agent/TaskResult.java | 1 +
.../manager/dao/entity/AgentTaskConfigEntity.java | 39 ++-
.../dao/mapper/AgentTaskConfigEntityMapper.java | 45 ++++
.../mappers/AgentTaskConfigEntityMapper.xml | 100 ++++++++
.../{SortConfigLoader.java => ConfigLoader.java} | 10 +-
.../service/core/impl/AgentServiceImpl.java | 136 +++++-----
...ConfigLoaderImpl.java => ConfigLoaderImpl.java} | 17 +-
.../service/core/impl/SortClusterServiceImpl.java | 16 +-
.../manager/service/core/impl/SortServiceImpl.java | 6 +-
.../service/core/impl/SortSourceServiceImpl.java | 6 +-
.../listener/StreamTaskListenerFactory.java | 4 +
.../listener/source/SourceStartListener.java | 86 +++++++
.../repository/DataProxyConfigRepository.java | 8 +-
.../service/source/AbstractSourceOperator.java | 275 +++++++++++++++++++++
.../service/source/StreamSourceOperator.java | 8 +
.../service/stream/TemplateServiceImpl.java | 4 +-
.../main/resources/h2/apache_inlong_manager.sql | 20 ++
.../manager-web/sql/apache_inlong_manager.sql | 21 ++
inlong-manager/manager-web/sql/changes-1.13.0.sql | 21 ++
20 files changed, 703 insertions(+), 121 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
index 2399c9657c..ff8cd4df3e 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
@@ -34,6 +34,7 @@ public class AgentConfigInfo {
AgentResponseCode code;
private String zkUrl;
private AgentClusterInfo cluster;
+ private Integer version;
private String md5;
@Data
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index 2fcbec919d..7970e345cb 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -36,6 +36,7 @@ public class TaskResult {
private List<CmdConfig> cmdConfigs;
private List<DataConfig> dataConfigs;
private String md5;
+ private Integer version;
AgentResponseCode code;
}
\ No newline at end of file
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
similarity index 58%
copy from
inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
copy to
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
index 2399c9657c..cc8c3ab529 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigInfo.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
@@ -15,36 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.common.pojo.agent;
+package org.apache.inlong.manager.dao.entity;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
-import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.Date;
/**
- * The Agent config info.
+ * Agent task config entity, including agent ip, cluster name, etc.
*/
@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class AgentConfigInfo {
+public class AgentTaskConfigEntity implements Serializable {
- AgentResponseCode code;
- private String zkUrl;
- private AgentClusterInfo cluster;
- private String md5;
+ private static final long serialVersionUID = 1L;
+ private Integer id;
+ private String clusterName;
+ private String agentIp;
- @Data
- @Builder
- @NoArgsConstructor
- @AllArgsConstructor
- public static class AgentClusterInfo {
+ private String configParams;
- private Integer parentId;
+ private String taskParams;
- private String clusterName;
+ private Integer isDeleted;
+ private String creator;
+ private String modifier;
+ private Date createTime;
+ private Date modifyTime;
+ private Integer version;
- }
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
new file mode 100644
index 0000000000..d673a16eac
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/AgentTaskConfigEntityMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
+
+import org.apache.ibatis.annotations.Options;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface AgentTaskConfigEntityMapper {
+
+ int insert(AgentTaskConfigEntity record);
+
+ AgentTaskConfigEntity selectByPrimaryKey(Integer id);
+
+ AgentTaskConfigEntity selectByIdentifier(@Param("agentIp") String agentIp,
+ @Param("clusterName") String clusterName);
+
+ int updateByIdSelective(AgentTaskConfigEntity record);
+
+ @MultiTenantQuery(with = false)
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize =
Integer.MIN_VALUE)
+ Cursor<AgentTaskConfigEntity> selectAllAgentTaskConfigs();
+
+}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
new file mode 100644
index 0000000000..9a5f237c8d
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
+ <result column="cluster_name" jdbcType="VARCHAR"
property="clusterName"/>
+ <result column="config_params" jdbcType="VARCHAR"
property="configParams"/>
+ <result column="task_params" jdbcType="VARCHAR" property="taskParams"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
+ <result column="version" jdbcType="INTEGER" property="version"/>
+ </resultMap>
+
+ <sql id="Base_Column_List">
+ id, agent_ip, cluster_name, config_params, task_params, is_deleted,
creator, modifier, create_time, modify_time, version
+ </sql>
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+
parameterType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+ insert into agent_task_config (id, agent_ip, cluster_name,
+ config_params, task_params,
+ creator, modifier)
+ values (#{id, jdbcType=INTEGER}, #{agentIp, jdbcType=VARCHAR},
#{clusterName, jdbcType=VARCHAR},
+ #{configParams, jdbcType=VARCHAR}, #{taskParams,
jdbcType=VARCHAR},
+ #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
+ </insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from agent_task_config
+ where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
+ </select>
+ <select id="selectByIdentifier"
resultType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from agent_task_config
+ where agent_ip = #{agentIp,jdbcType=VARCHAR}
+ and cluster_name = #{clusterName, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
+ <select id="selectAllAgentTaskConfigs"
resultType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from agent_task_config
+ <where>
+ and is_deleted = 0
+ and agent_ip is not null
+ </where>
+ </select>
+ <update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
+ update agent_task_config
+ <set>
+ <if test="agentIp != null">
+ agent_ip = #{agentIp,jdbcType=VARCHAR},
+ </if>
+ <if test="clusterName != null">
+ cluster_name = #{clusterName,jdbcType=VARCHAR},
+ </if>
+ <if test="configParams != null">
+ config_params = #{configParams,jdbcType=VARCHAR},
+ </if>
+ <if test="taskParams != null">
+ task_params = #{taskParams,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ version = #{version,jdbcType=INTEGER} + 1
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ and version = #{version,jdbcType=INTEGER}
+ </update>
+</mapper>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
similarity index 93%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
index 2923e6f34d..a491838c68 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.core;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
@@ -35,7 +36,7 @@ import java.util.List;
/**
* Loader for sort service to load configs thought Cursor
*/
-public interface SortConfigLoader {
+public interface ConfigLoader {
/**
* Load all clusters by cursor
@@ -124,4 +125,11 @@ public interface SortConfigLoader {
*/
List<ClusterConfigEntity> loadAllClusterConfigEntity();
+ /**
+ * Load all agent task config info
+ *
+ * @return List of agent task config info
+ */
+ List<AgentTaskConfigEntity> loadAllAgentTaskConfigEntity();
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 37a19beb14..f7d669151a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -47,6 +47,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -63,7 +64,6 @@ import
org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
-import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterInfo;
import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
@@ -73,6 +73,7 @@ import
org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
@@ -107,9 +108,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -144,10 +147,8 @@ public class AgentServiceImpl implements AgentService {
new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
new CallerRunsPolicy());
- @Getter
- private LoadingCache<TaskRequest, TaskResult> taskCache;
- @Getter
- private LoadingCache<AgentConfigRequest, AgentConfigInfo> agentConfigCache;
+ private Map<String, TaskResult> taskConfigMap = new ConcurrentHashMap<>();
+ private Map<String, AgentConfigInfo> agentConfigMap = new
ConcurrentHashMap<>();
@Getter
private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;
@@ -191,6 +192,8 @@ public class AgentServiceImpl implements AgentService {
private PackageConfigEntityMapper packageConfigEntityMapper;
@Autowired
private InlongClusterService clusterService;
+ @Autowired
+ private ConfigLoader configLoader;
/**
* Start the update task
@@ -201,12 +204,6 @@ public class AgentServiceImpl implements AgentService {
// The expiry time of cluster info cache must be greater than
taskCache cache
// because the eviction handler needs to query cluster info cache
long expireTime = 10 * 5;
- taskCache = Caffeine.newBuilder()
- .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
- .build(this::fetchTask);
- agentConfigCache = Caffeine.newBuilder()
- .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
- .build(this::fetchAgentConfig);
LOGGER.debug("start to reload config for installer.");
try {
moduleConfigCache = Caffeine.newBuilder()
@@ -215,6 +212,12 @@ public class AgentServiceImpl implements AgentService {
} catch (Throwable t) {
LOGGER.error("fail to reload all config for installer ", t);
}
+ try {
+ reload();
+ setReloadTimer();
+ } catch (Exception e) {
+ LOGGER.error("load agent task config failed", e);
+ }
LOGGER.debug("end to reload config for installer");
if (updateTaskTimeoutEnabled) {
ThreadFactory factory = new ThreadFactoryBuilder()
@@ -266,6 +269,11 @@ public class AgentServiceImpl implements AgentService {
}
}
+ private void setReloadTimer() {
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleWithFixedDelay(this::reload, 60000L, 60000L,
TimeUnit.MILLISECONDS);
+ }
+
@Override
public Boolean reportSnapshot(TaskSnapshotRequest request) {
return snapshotOperator.snapshot(request);
@@ -293,6 +301,42 @@ public class AgentServiceImpl implements AgentService {
}
}
+ public void reload() {
+ LOGGER.debug("start to reload agent task config.");
+ try {
+ Map<String, TaskResult> newTaskConfigMap = new
ConcurrentHashMap<>();
+ Map<String, AgentConfigInfo> newAgentConfigMap = new
ConcurrentHashMap<>();
+ List<AgentTaskConfigEntity> agentTaskConfigEntityList =
configLoader.loadAllAgentTaskConfigEntity();
+ agentTaskConfigEntityList.forEach(agentTaskConfigEntity -> {
+ try {
+ String key = agentTaskConfigEntity.getAgentIp() +
InlongConstants.UNDERSCORE
+ + agentTaskConfigEntity.getClusterName();
+ TaskResult taskResult =
JsonUtils.parseObject(agentTaskConfigEntity.getTaskParams(),
+ TaskResult.class);
+ if (taskResult != null) {
+
taskResult.setVersion(agentTaskConfigEntity.getVersion());
+ newTaskConfigMap.putIfAbsent(key, taskResult);
+ }
+ AgentConfigInfo agentConfigInfo =
JsonUtils.parseObject(agentTaskConfigEntity.getConfigParams(),
+ AgentConfigInfo.class);
+ if (agentConfigInfo != null) {
+
agentConfigInfo.setVersion(agentTaskConfigEntity.getVersion());
+ newAgentConfigMap.putIfAbsent(key, agentConfigInfo);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to get agent task config for agent
ip={}, cluster name={}",
+ agentTaskConfigEntity.getAgentIp(),
agentTaskConfigEntity.getClusterName());
+ }
+
+ });
+ taskConfigMap = newTaskConfigMap;
+ agentConfigMap = newAgentConfigMap;
+ } catch (Throwable t) {
+ LOGGER.error("failed to reload all agent task config", t);
+ }
+ LOGGER.debug("end to reload agent task config");
+ }
+
/**
* Update task status by command.
*
@@ -337,7 +381,8 @@ public class AgentServiceImpl implements AgentService {
@Override
public AgentConfigInfo getAgentConfig(AgentConfigRequest request) {
LOGGER.debug("begin to get agent config info for {}", request);
- AgentConfigInfo agentConfigInfo = agentConfigCache.get(request);
+ String key = request.getIp() + InlongConstants.UNDERSCORE +
request.getClusterName();
+ AgentConfigInfo agentConfigInfo = agentConfigMap.get(key);
if (agentConfigInfo == null) {
return null;
}
@@ -370,7 +415,8 @@ public class AgentServiceImpl implements AgentService {
@Override
public TaskResult getExistTaskConfig(TaskRequest request) {
LOGGER.debug("begin to get all exist task by request={}", request);
- TaskResult taskResult = taskCache.get(request);
+ String key = request.getAgentIp() + InlongConstants.UNDERSCORE +
request.getClusterName();
+ TaskResult taskResult = taskConfigMap.get(key);
if (taskResult == null) {
return null;
}
@@ -825,70 +871,6 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
- private TaskResult fetchTask(TaskRequest request) {
- final String clusterName = request.getClusterName();
- final String ip = request.getAgentIp();
- final String uuid = request.getUuid();
- List<StreamSourceEntity> normalSourceEntities =
sourceMapper.selectByStatusAndCluster(
-
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
- clusterName, ip, uuid);
- List<StreamSourceEntity> taskLists = new
ArrayList<>(normalSourceEntities);
- List<StreamSourceEntity> stopSourceEntities =
sourceMapper.selectByStatusAndCluster(
-
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
- clusterName, ip, uuid);
- taskLists.addAll(stopSourceEntities);
- LOGGER.debug("success to add task : {}", taskLists.size());
- List<DataConfig> runningTaskConfig = Lists.newArrayList();
- try {
- List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
- if (CollectionUtils.isEmpty(taskLists)) {
- return
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
- }
- for (StreamSourceEntity sourceEntity : taskLists) {
- int op = getOp(sourceEntity.getStatus());
- DataConfig dataConfig = getDataConfig(sourceEntity, op);
- runningTaskConfig.add(dataConfig);
- }
- TaskResult taskResult =
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
- String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
- taskResult.setMd5(md5);
- taskResult.setCode(AgentResponseCode.SUCCESS);
- return taskResult;
- } catch (Exception e) {
- LOGGER.error("get all exist task failed:", e);
- throw new BusinessException("get all exist task failed:" +
e.getMessage());
- }
- }
-
- private AgentConfigInfo fetchAgentConfig(AgentConfigRequest request) {
- LOGGER.debug("begin to get agent config info for {}", request);
- AgentConfigInfo agentConfigInfo = new AgentConfigInfo();
- Set<String> tagSet = new HashSet<>(16);
-
tagSet.addAll(Arrays.asList(request.getClusterTag().split(InlongConstants.COMMA)));
- List<String> clusterTagList = new ArrayList<>(tagSet);
- ClusterPageRequest pageRequest = ClusterPageRequest.builder()
- .type(ClusterType.AGENT_ZK)
- .clusterTagList(clusterTagList)
- .build();
- List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
- if (CollectionUtils.isNotEmpty(agentZkCluster)) {
- agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
- }
-
- AgentClusterInfo clusterInfo = (AgentClusterInfo)
clusterService.getOne(
- null, request.getClusterName(), ClusterType.AGENT);
- agentConfigInfo.setCluster(AgentConfigInfo.AgentClusterInfo.builder()
- .parentId(clusterInfo.getId())
- .clusterName(clusterInfo.getName())
- .build());
- String jsonStr = GSON.toJson(agentConfigInfo);
- String configMd5 = DigestUtils.md5Hex(jsonStr);
- agentConfigInfo.setMd5(configMd5);
- agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
- LOGGER.debug("success to get agent config info for: {}, result: {}",
request, agentConfigInfo);
- return agentConfigInfo;
- }
-
private ConfigResult loadModuleConfigs(ConfigRequest request) {
final String clusterName = request.getClusterName();
final String ip = request.getLocalIp();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
similarity index 90%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
index 03b5f24977..649586bb21 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
@@ -17,12 +17,14 @@
package org.apache.inlong.manager.service.core.impl;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
import org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.ClusterConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
@@ -39,7 +41,7 @@ import
org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.ibatis.cursor.Cursor;
import org.springframework.beans.factory.annotation.Autowired;
@@ -50,7 +52,7 @@ import java.util.ArrayList;
import java.util.List;
@Service
-public class SortConfigLoaderImpl implements SortConfigLoader {
+public class ConfigLoaderImpl implements ConfigLoader {
@Autowired
private InlongClusterEntityMapper clusterEntityMapper;
@@ -72,6 +74,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader
{
private SortConfigEntityMapper sortConfigEntityMapper;
@Autowired
private ClusterConfigEntityMapper clusterConfigEntityMapper;
+ @Autowired
+ private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
@Transactional
@Override
@@ -180,4 +184,13 @@ public class SortConfigLoaderImpl implements
SortConfigLoader {
cursor.forEach(allClusterConfigs::add);
return allClusterConfigs;
}
+
+ @Transactional
+ @Override
+ public List<AgentTaskConfigEntity> loadAllAgentTaskConfigEntity() {
+ Cursor<AgentTaskConfigEntity> cursor =
agentTaskConfigEntityMapper.selectAllAgentTaskConfigs();
+ List<AgentTaskConfigEntity> agentTaskConfigEntityList = new
ArrayList<>();
+ cursor.forEach(agentTaskConfigEntityList::add);
+ return agentTaskConfigEntityList;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index 24ee49f757..36328007e1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -26,8 +26,8 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
@@ -93,7 +93,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
private long reloadInterval;
@Autowired
- private SortConfigLoader sortConfigLoader;
+ private ConfigLoader configLoader;
@Autowired
private SinkOperatorFactory sinkOperatorFactory;
@Autowired
@@ -171,16 +171,16 @@ public class SortClusterServiceImpl implements
SortClusterService {
private void reloadAllClusterConfig() {
// load all fields info
- List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
+ List<SortFieldInfo> fieldInfos = configLoader.loadAllFields();
fieldMap = new HashMap<>();
fieldInfos.forEach(info -> {
List<String> fields = fieldMap.computeIfAbsent(info.getSinkId(), k
-> new ArrayList<>());
fields.add(info.getFieldName());
});
- List<StreamSinkEntity> sinkEntities =
sortConfigLoader.loadAllStreamSinkEntity();
+ List<StreamSinkEntity> sinkEntities =
configLoader.loadAllStreamSinkEntity();
// get all task under a given cluster, has been reduced into cluster
and task.
- List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
+ List<SortTaskInfo> tasks = configLoader.loadAllTask();
Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
.filter(dto -> StringUtils.isNotBlank(dto.getSortClusterName())
&& StringUtils.isNotBlank(dto.getSortTaskName())
@@ -189,7 +189,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
// reload all streams
- allStreams = sortConfigLoader.loadAllStreams()
+ allStreams = configLoader.loadAllStreams()
.stream()
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
@@ -202,7 +202,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
.collect(Collectors.groupingBy(StreamSinkEntity::getSortTaskName));
// get all data nodes and group by node name
- List<DataNodeEntity> dataNodeEntities =
sortConfigLoader.loadAllDataNodeEntity();
+ List<DataNodeEntity> dataNodeEntities =
configLoader.loadAllDataNodeEntity();
Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream()
.filter(entity -> StringUtils.isNotBlank(entity.getName()))
.map(entity -> {
@@ -300,6 +300,6 @@ public class SortClusterServiceImpl implements
SortClusterService {
*/
private void setReloadTimer() {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleAtFixedRate(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
+ executorService.scheduleWithFixedDelay(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 2f87d3b92e..d01911acb9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -42,8 +42,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.core.SortService;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -99,7 +99,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
@Autowired
private InlongStreamService streamService;
@Autowired
- private SortConfigLoader configLoader;
+ private ConfigLoader configLoader;
@Autowired
private DataNodeOperatorFactory dataNodeOperatorFactory;
/**
@@ -152,7 +152,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
private void setReloadTimer() {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
long reloadInterval = 60000L;
- executorService.scheduleAtFixedRate(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
+ executorService.scheduleWithFixedDelay(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 247f270685..9d6fe9ceb5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -33,7 +33,7 @@ import
org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.core.SortSourceService;
import com.google.gson.Gson;
@@ -111,7 +111,7 @@ public class SortSourceServiceImpl implements
SortSourceService {
private Map<String, Map<String, List<SortSourceStreamSinkInfo>>>
streamSinkMap;
@Autowired
- private SortConfigLoader configLoader;
+ private ConfigLoader configLoader;
@PostConstruct
public void initialize() {
@@ -458,6 +458,6 @@ public class SortSourceServiceImpl implements
SortSourceService {
private void setReloadTimer() {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
long reloadInterval = 60000L;
- executorService.scheduleAtFixedRate(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
+ executorService.scheduleWithFixedDelay(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index 5069423c13..ba729202af 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.plugin.PluginBinder;
import
org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener;
import
org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener;
import
org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener;
+import org.apache.inlong.manager.service.listener.source.SourceStartListener;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
@@ -60,6 +61,8 @@ public class StreamTaskListenerFactory implements
PluginBinder, TaskListenerFact
private StreamSortConfigListener streamSortConfigListener;
@Autowired
private StreamSinkResourceListener sinkResourceListener;
+ @Autowired
+ private SourceStartListener sourceStartListener;
@PostConstruct
public void init() {
@@ -70,6 +73,7 @@ public class StreamTaskListenerFactory implements
PluginBinder, TaskListenerFact
sortOperateListeners.add(streamSortConfigListener);
sinkOperateListeners = new LinkedList<>();
sinkOperateListeners.add(sinkResourceListener);
+ sourceOperateListeners.add(sourceStartListener);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
new file mode 100644
index 0000000000..2ee6f504ae
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStartListener.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.listener.source;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.source.SourceOperatorFactory;
+import org.apache.inlong.manager.service.source.StreamSourceOperator;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Slf4j
+@Component
+public class SourceStartListener implements SourceOperateListener {
+
+ @Autowired
+ protected StreamSourceService streamSourceService;
+ @Autowired
+ private SourceOperatorFactory operatorFactory;
+
+ @Override
+ public String name() {
+ return getClass().getSimpleName();
+ }
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public boolean accept(WorkflowContext context) {
+ if (isGroupProcessForm(context)) {
+ return false;
+ }
+ StreamResourceProcessForm processForm = (StreamResourceProcessForm)
context.getProcessForm();
+ return
InlongConstants.STANDARD_MODE.equals(processForm.getGroupInfo().getInlongGroupMode())
+ && processForm.getGroupOperateType() == GroupOperateType.INIT;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ StreamResourceProcessForm form = (StreamResourceProcessForm)
context.getProcessForm();
+ String operator = context.getOperator();
+ InlongStreamInfo streamInfo = form.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ log.info("begin to update agent task config for groupId={},
streamId={}", groupId, streamId);
+ List<StreamSource> sources = streamSourceService.listSource(groupId,
streamId);
+ for (StreamSource source : sources) {
+ SourceRequest request = source.genSourceRequest();
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
+ sourceOperator.updateAgentTaskConfig(request, operator);
+ }
+ log.info("success to update agent task config for groupId={},
streamId={}", groupId, streamId);
+ return ListenerResult.success();
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index d4c951c209..d4087d8976 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -46,7 +46,7 @@ import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
-import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.core.ConfigLoader;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
@@ -122,7 +122,7 @@ public class DataProxyConfigRepository implements
IRepository {
@Autowired
private StreamSinkEntityMapper streamSinkMapper;
@Autowired
- private SortConfigLoader sortConfigLoader;
+ private ConfigLoader configLoader;
@PostConstruct
public void initialize() {
@@ -364,7 +364,7 @@ public class DataProxyConfigRepository implements
IRepository {
Map<String, Map<String, String>> groupParams = new HashMap<>();
groupIdMap.forEach((k, v) -> groupParams.put(k,
fromJsonToMap(v.getExtParams())));
// reload inlong group ext
- List<InlongGroupExtEntity> groupExtCursor = sortConfigLoader
+ List<InlongGroupExtEntity> groupExtCursor = configLoader
.loadGroupBackupInfo(ClusterSwitch.BACKUP_CLUSTER_TAG);
groupExtCursor.forEach(v ->
groupParams.computeIfAbsent(v.getInlongGroupId(), k -> new HashMap<>())
.put(ClusterSwitch.BACKUP_CLUSTER_TAG, v.getKeyValue()));
@@ -390,7 +390,7 @@ public class DataProxyConfigRepository implements
IRepository {
streamParams.put(k, params);
});
// reload inlong stream ext
- List<InlongStreamExtEntity> streamExtCursor = sortConfigLoader
+ List<InlongStreamExtEntity> streamExtCursor = configLoader
.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE);
streamExtCursor.forEach(v -> streamParams
.computeIfAbsent(getInlongId(v.getInlongGroupId(),
v.getInlongStreamId()), k -> new HashMap<>())
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 39d3845209..2ecd79fb37 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -17,28 +17,58 @@
package org.apache.inlong.manager.service.source;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
+import org.apache.inlong.common.pojo.agent.CmdConfig;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
+import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeService;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -48,8 +78,17 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
/**
* Default operator of stream source.
@@ -57,6 +96,8 @@ import java.util.Objects;
public abstract class AbstractSourceOperator implements StreamSourceOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSourceOperator.class);
+ private static final Gson GSON = new Gson();
+ private static final int MODULUS_100 = 100;
@Autowired
protected StreamSourceEntityMapper sourceMapper;
@@ -66,6 +107,18 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
protected InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
protected DataNodeService dataNodeService;
+ @Autowired
+ private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterMapper;
+ @Autowired
+ private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private ObjectMapper objectMapper;
/**
* Getting the source type.
@@ -109,6 +162,9 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
if (request.getEnableSyncSchema()) {
syncSourceFieldInfo(request, operator);
}
+ if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ updateAgentTaskConfig(request, operator);
+ }
return entity.getId();
}
@@ -207,6 +263,9 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
}
updateFieldOpt(entity, request.getFieldList());
LOGGER.debug("success to update source of type={}",
request.getSourceType());
+ if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ updateAgentTaskConfig(request, operator);
+ }
}
@Override
@@ -232,6 +291,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
curEntity.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
+ updateAgentTaskConfig(request, operator);
}
@Override
@@ -346,4 +406,219 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
throw new BusinessException(String.format("not support data add task
for type =%s", request.getSourceType()));
}
+
+ @Override
+ public void updateAgentTaskConfig(SourceRequest request, String operator) {
+ try {
+ if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
+ return;
+ }
+ final String clusterName = request.getInlongClusterName();
+ final String ip = request.getAgentIp();
+ final String uuid = request.getUuid();
+ if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(ip)) {
+ LOGGER.warn("skip update agent task config where cluster name
or ip is null for request={}", request);
+ return;
+ }
+ AgentTaskConfigEntity existEntity =
agentTaskConfigEntityMapper.selectByIdentifier(ip, clusterName);
+ AgentTaskConfigEntity agentTaskConfigEntity = new
AgentTaskConfigEntity();
+ if (existEntity != null) {
+ agentTaskConfigEntity =
CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true);
+ }
+ List<StreamSourceEntity> normalSourceEntities =
sourceMapper.selectByStatusAndCluster(
+
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode)
+ .collect(Collectors.toList()),
+ clusterName, ip, uuid);
+ List<StreamSourceEntity> taskLists = new
ArrayList<>(normalSourceEntities);
+ List<StreamSourceEntity> stopSourceEntities =
sourceMapper.selectByStatusAndCluster(
+
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode)
+ .collect(Collectors.toList()),
+ clusterName, ip, uuid);
+ taskLists.addAll(stopSourceEntities);
+ LOGGER.debug("success to add task : {}", taskLists.size());
+ List<DataConfig> runningTaskConfig = Lists.newArrayList();
+ List<CmdConfig> cmdConfigs =
sourceCmdConfigMapper.queryCmdByAgentIp(request.getAgentIp()).stream()
+ .map(cmd -> {
+ CmdConfig cmdConfig = new CmdConfig();
+ cmdConfig.setDataTime(cmd.getSpecifiedDataTime());
+ cmdConfig.setOp(cmd.getCmdType());
+ cmdConfig.setId(cmd.getId());
+ cmdConfig.setTaskId(cmd.getTaskId());
+ return cmdConfig;
+ }).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(taskLists)) {
+
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+ return;
+ }
+ for (StreamSourceEntity sourceEntity : taskLists) {
+ int op = sourceEntity.getStatus() % MODULUS_100;
+ DataConfig dataConfig = getDataConfig(sourceEntity, op);
+ runningTaskConfig.add(dataConfig);
+ }
+ TaskResult taskResult =
+
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+ String md5 = DigestUtils.md5Hex(GSON.toJson(taskResult));
+ taskResult.setMd5(md5);
+ taskResult.setCode(AgentResponseCode.SUCCESS);
+ agentTaskConfigEntity.setAgentIp(request.getAgentIp());
+
agentTaskConfigEntity.setClusterName(request.getInlongClusterName());
+
agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult));
+
+ LOGGER.debug("begin to get agent config info for {}", request);
+ Set<String> tagSet = new HashSet<>(16);
+ InlongGroupEntity groupEntity =
+
groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId());
+ String clusterTag = groupEntity.getInlongClusterTag();
+ InlongClusterEntity agentClusterInfo =
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
+ ClusterType.AGENT);
+ AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
+ .cluster(AgentConfigInfo.AgentClusterInfo.builder()
+ .parentId(agentClusterInfo.getId())
+ .clusterName(agentClusterInfo.getName())
+ .build())
+ .build();
+ if (StringUtils.isNotBlank(clusterTag)) {
+
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
+ List<String> clusterTagList = new ArrayList<>(tagSet);
+ ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+ .type(ClusterType.AGENT_ZK)
+ .clusterTagList(clusterTagList)
+ .build();
+ List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
+ if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+ agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+ }
+ }
+
+ String jsonStr = GSON.toJson(agentConfigInfo);
+ String configMd5 = DigestUtils.md5Hex(jsonStr);
+ agentConfigInfo.setMd5(configMd5);
+ agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
+ agentClusterInfo.setModifier(operator);
+ if (existEntity == null) {
+ agentTaskConfigEntity.setCreator(operator);
+ agentTaskConfigEntityMapper.insert(agentTaskConfigEntity);
+ } else {
+
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+ }
+ LOGGER.debug("success to update agent config info for: {}, result:
{}", request, agentConfigInfo);
+ } catch (Exception e) {
+ String errMsg = String.format("update agent task config failed for
groupId=%s, streamId=%s, ip=%s",
+ request.getInlongGroupId(), request.getInlongStreamId(),
request.getAgentIp());
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
+ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setIp(entity.getAgentIp());
+ dataConfig.setUuid(entity.getUuid());
+ dataConfig.setOp(String.valueOf(op));
+ dataConfig.setTaskId(entity.getId());
+ dataConfig.setTaskType(getTaskType(entity));
+ dataConfig.setTaskName(entity.getSourceName());
+ dataConfig.setSnapshot(entity.getSnapshot());
+ dataConfig.setTimeZone(entity.getDataTimeZone());
+ dataConfig.setVersion(entity.getVersion());
+
+ String groupId = entity.getInlongGroupId();
+ String streamId = entity.getInlongStreamId();
+ dataConfig.setInlongGroupId(groupId);
+ dataConfig.setInlongStreamId(streamId);
+
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(groupId);
+ InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
+ String extParams = getExtParams(entity);
+ if (groupEntity != null && streamEntity != null) {
+ dataConfig.setState(
+
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
+ ? TaskStateEnum.RUNNING.getType()
+ : TaskStateEnum.FROZEN.getType());
+ dataConfig.setSyncSend(streamEntity.getSyncSend());
+ if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
+ String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
+ FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams,
FileSourceDTO.class);
+ if (Objects.nonNull(fileSourceDTO)) {
+ fileSourceDTO.setDataSeparator(dataSeparator);
+
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+ extParams = JsonUtils.toJsonString(fileSourceDTO);
+ }
+ }
+ InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+ // Processing extParams
+ unpackExtParams(streamEntity.getExtParams(), streamInfo);
+ dataConfig.setPredefinedFields(streamInfo.getPredefinedFields());
+
+ int dataReportType = groupEntity.getDataReportType();
+ dataConfig.setDataReportType(dataReportType);
+ if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+ // add mq cluster setting
+ List<MQClusterInfo> mqSet = new ArrayList<>();
+ List<String> clusterTagList =
Collections.singletonList(groupEntity.getInlongClusterTag());
+ ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+ .type(groupEntity.getMqType())
+ .clusterTagList(clusterTagList)
+ .build();
+ List<InlongClusterEntity> mqClusterList =
clusterMapper.selectByCondition(pageRequest);
+ for (InlongClusterEntity cluster : mqClusterList) {
+ MQClusterInfo clusterInfo = new MQClusterInfo();
+ clusterInfo.setUrl(cluster.getUrl());
+ clusterInfo.setToken(cluster.getToken());
+ clusterInfo.setMqType(cluster.getType());
+
clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(),
HashMap.class));
+ mqSet.add(clusterInfo);
+ }
+ dataConfig.setMqClusters(mqSet);
+
+ // add topic setting
+ String mqResource = groupEntity.getMqResource();
+ String mqType = groupEntity.getMqType();
+ if (MQType.PULSAR.equals(mqType) ||
MQType.TDMQ_PULSAR.equals(mqType)) {
+ // first get the tenant from the InlongGroup, and then get
it from the PulsarCluster.
+ InlongPulsarDTO pulsarDTO =
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+ String tenant = pulsarDTO.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ // If there are multiple Pulsar clusters, take the
first one.
+ // Note that the tenants in multiple Pulsar clusters
must be identical.
+ PulsarClusterDTO pulsarCluster =
PulsarClusterDTO.getFromJson(
+ mqClusterList.get(0).getExtParams());
+ tenant = pulsarCluster.getPulsarTenant();
+ }
+
+ String topic =
String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+ tenant, mqResource, streamEntity.getMqResource());
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId + "/" + streamId);
+ topicConfig.setTopic(topic);
+ dataConfig.setTopicInfo(topicConfig);
+ } else if (MQType.TUBEMQ.equals(mqType)) {
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId);
+ topicConfig.setTopic(mqResource);
+ dataConfig.setTopicInfo(topicConfig);
+ } else if (MQType.KAFKA.equals(mqType)) {
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId);
+ topicConfig.setTopic(groupEntity.getMqResource() + DOT +
streamEntity.getMqResource());
+ dataConfig.setTopicInfo(topicConfig);
+ }
+ } else {
+ LOGGER.warn("set syncSend=[0] as the stream not exists for
groupId={}, streamId={}", groupId, streamId);
+ }
+ }
+ dataConfig.setExtParams(extParams);
+ return dataConfig;
+ }
+
+ private int getTaskType(StreamSourceEntity sourceEntity) {
+ TaskTypeEnum taskType =
SourceType.SOURCE_TASK_MAP.get(sourceEntity.getSourceType());
+ if (taskType == null) {
+ throw new BusinessException("Unsupported task type for source type
" + sourceEntity.getSourceType());
+ }
+ return taskType.getType();
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 997f39b867..e820140ae6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -136,4 +136,12 @@ public interface StreamSourceOperator {
*/
Integer addDataAddTask(DataAddTaskRequest request, String operator);
+ /**
+ * Update the agent task config info.
+ *
+ * @param request source request
+ * @param operator name of the operator
+ */
+ void updateAgentTaskConfig(SourceRequest request, String operator);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
index a76328ee4d..37953f1276 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/TemplateServiceImpl.java
@@ -116,7 +116,7 @@ public class TemplateServiceImpl implements TemplateService
{
TemplateEntity templateEntity =
templateEntityMapper.selectByName(templateName);
if (templateEntity == null) {
LOGGER.error("inlong template not found by template name={}",
templateName);
- throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT);
+ throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND);
}
TemplateInfo templateInfo =
CommonBeanUtils.copyProperties(templateEntity, TemplateInfo::new);
@@ -182,7 +182,7 @@ public class TemplateServiceImpl implements TemplateService
{
TemplateEntity templateEntity =
templateEntityMapper.selectByName(templateName);
if (templateEntity == null) {
LOGGER.error("inlong template not found by template name={}",
templateName);
- throw new BusinessException(ErrorCodeEnum.TEMPLATE_INFO_INCORRECT);
+ throw new BusinessException(ErrorCodeEnum.TEMPLATE_NOT_FOUND);
}
if (templateEntity.getInCharges().contains(operator)) {
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 8a0bcbf9c9..531a639d4a 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -901,6 +901,26 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
UNIQUE KEY `unique_clustert_config_sink_id` (`cluster_tag`, `is_deleted`)
);
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `config_params` text DEFAULT NULL COMMENT 'The agent config
params',
+ `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
+ `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
+ `creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
+ `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0 is not deleted, if greater than 0, delete',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`,
`cluster_name`, `is_deleted`)
+);
+
-- ----------------------------
-- Table structure for template
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index dfb6420dc9..430982df28 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -952,6 +952,27 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config';
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `config_params` text DEFAULT NULL COMMENT 'The agent config
params',
+ `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
+ `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
+ `creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
+ `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0 is not deleted, if greater than 0, delete',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`,
`cluster_name`, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config';
+
-- ----------------------------
-- Table structure for template
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 5000782285..166c98f199 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -118,3 +118,24 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
-- ----------------------------
+
+-- ----------------------------
+-- Table structure for agent_task_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `agent_task_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `config_params` text DEFAULT NULL COMMENT 'The agent config
params',
+ `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
+ `cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
+ `creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
+ `modifier` varchar(128) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0 is not deleted, if greater than 0, delete',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version
number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_agent_task_config_ip_cluster_name` (`agent_ip`,
`cluster_name`, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT = 'agent_task_config';