This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5abb311b14 [INLONG-11323][Manager] Modify the parameters of the data
add tasks for file collection (#11324)
5abb311b14 is described below
commit 5abb311b1457486db45b714a545e0f28c658943a
Author: fuweng11 <[email protected]>
AuthorDate: Fri Oct 11 15:56:15 2024 +0800
[INLONG-11323][Manager] Modify the parameters of the data add tasks for
file collection (#11324)
---
.../manager/pojo/source/DataAddTaskRequest.java | 11 +++-
.../service/source/AbstractSourceOperator.java | 69 ++++++++++++----------
.../service/source/StreamSourceService.java | 11 +---
.../service/source/StreamSourceServiceImpl.java | 44 ++++++++------
.../web/controller/StreamSourceController.java | 9 +--
5 files changed, 77 insertions(+), 67 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index d835c4ee37..fccc8c06ff 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -25,6 +25,8 @@ import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotBlank;
+import java.util.List;
+
/**
* Data add task information
*/
@@ -33,9 +35,16 @@ import javax.validation.constraints.NotBlank;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property =
"sourceType")
public class DataAddTaskRequest {
- @ApiModelProperty(value = "Source ID")
+ @ApiModelProperty(value = "Group Id")
+ @NotBlank(message = "inlongGroupId cannot be blank")
+ private String groupId;
+
+ @ApiModelProperty(value = "Source ID", hidden = true)
private Integer sourceId;
+ @ApiModelProperty(value = "Agent ip List")
+ private List<String> agentIpList;
+
@ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
@NotBlank(message = "sourceType cannot be blank")
@Length(min = 1, max = 20, message = "length must be between 1 and 20")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 5caf6d2473..885fd2a99e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -421,6 +421,42 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
if (existEntity != null) {
agentTaskConfigEntity =
CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true);
}
+
+ LOGGER.debug("begin to get agent config info for {}", request);
+ Set<String> tagSet = new HashSet<>(16);
+ InlongClusterEntity agentClusterInfo =
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
+ ClusterType.AGENT);
+ if (agentClusterInfo == null) {
+
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+ return;
+ }
+ String clusterTag = agentClusterInfo.getClusterTags();
+ AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
+ .cluster(AgentConfigInfo.AgentClusterInfo.builder()
+ .parentId(agentClusterInfo.getId())
+ .clusterName(agentClusterInfo.getName())
+ .build())
+ .build();
+ if (StringUtils.isNotBlank(clusterTag)) {
+
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
+ List<String> clusterTagList = new ArrayList<>(tagSet);
+ ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+ .type(ClusterType.AGENT_ZK)
+ .clusterTagList(clusterTagList)
+ .build();
+ List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
+ if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+ agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+ }
+ }
+
+ String jsonStr = GSON.toJson(agentConfigInfo);
+ String configMd5 = DigestUtils.md5Hex(jsonStr);
+ agentConfigInfo.setMd5(configMd5);
+ agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
+
List<StreamSourceEntity> normalSourceEntities =
sourceMapper.selectByStatusAndCluster(
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode)
.collect(Collectors.toList()),
@@ -443,7 +479,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
return cmdConfig;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskLists)) {
-
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+ agentTaskConfigEntity.setTaskParams(null);
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
return;
}
@@ -461,37 +497,6 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
agentTaskConfigEntity.setClusterName(request.getInlongClusterName());
agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult));
- LOGGER.debug("begin to get agent config info for {}", request);
- Set<String> tagSet = new HashSet<>(16);
- InlongGroupEntity groupEntity =
-
groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId());
- String clusterTag = groupEntity.getInlongClusterTag();
- InlongClusterEntity agentClusterInfo =
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
- ClusterType.AGENT);
- AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
- .cluster(AgentConfigInfo.AgentClusterInfo.builder()
- .parentId(agentClusterInfo.getId())
- .clusterName(agentClusterInfo.getName())
- .build())
- .build();
- if (StringUtils.isNotBlank(clusterTag)) {
-
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
- List<String> clusterTagList = new ArrayList<>(tagSet);
- ClusterPageRequest pageRequest = ClusterPageRequest.builder()
- .type(ClusterType.AGENT_ZK)
- .clusterTagList(clusterTagList)
- .build();
- List<InlongClusterEntity> agentZkCluster =
clusterMapper.selectByCondition(pageRequest);
- if (CollectionUtils.isNotEmpty(agentZkCluster)) {
- agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
- }
- }
-
- String jsonStr = GSON.toJson(agentConfigInfo);
- String configMd5 = DigestUtils.md5Hex(jsonStr);
- agentConfigInfo.setMd5(configMd5);
- agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
-
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
agentClusterInfo.setModifier(operator);
if (existEntity == null) {
agentTaskConfigEntity.setCreator(operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 9be7f06172..50b5150817 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -211,15 +211,6 @@ public interface StreamSourceService {
* @param operator Operator's name.
* @return source id after saving.
*/
- Integer addDataAddTask(DataAddTaskRequest request, String operator);
+ List<Integer> addDataAddTask(DataAddTaskRequest request, String operator);
- /**
- * Batch Save the data add task information
- *
- * @param requestList Source request list.
- * @param operator Operator's name.
- * @return source id list after saving.
- */
- List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest>
requestList,
- String operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 0241524dcf..de96386b25 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -68,6 +68,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -533,25 +534,32 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
}
@Override
- public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
+ public List<Integer> addDataAddTask(DataAddTaskRequest request, String
operator) {
LOGGER.info("begin to add data add task info: {}", request);
- StreamSourceEntity entity =
sourceMapper.selectById(request.getSourceId());
- StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
- int id = sourceOperator.addDataAddTask(request, operator);
- LOGGER.info("success to add data add task info: {}", request);
- return id;
- }
-
- @Override
- public List<Integer> batchAddDataAddTask(String groupId,
List<DataAddTaskRequest> requestList,
- String operator) {
- List<Integer> result = new ArrayList<>();
- String auditVersion =
String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null));
- for (DataAddTaskRequest request : requestList) {
- request.setAuditVersion(auditVersion);
- int id = addDataAddTask(request, operator);
- result.add(id);
+ String auditVersion =
String.valueOf(sourceMapper.selectDataAddTaskCount(request.getGroupId(), null));
+ request.setAuditVersion(auditVersion);
+ List<String> agentIpList = request.getAgentIpList();
+ List<StreamSourceEntity> entityList = new ArrayList<>();
+ List<Integer> resultIdList = new ArrayList<>();
+ if (CollectionUtils.isEmpty(agentIpList)) {
+ entityList = sourceMapper.selectByRelatedId(request.getGroupId(),
null, null);
+ } else {
+ for (String agentIp : agentIpList) {
+ List<StreamSourceEntity> sourceEntityList =
sourceMapper.selectByAgentIp(agentIp);
+ entityList.addAll(sourceEntityList);
+ }
}
- return result;
+ for (StreamSourceEntity sourceEntity : entityList) {
+ if (sourceEntity.getTaskMapId() != null ||
!Objects.equals(sourceEntity.getInlongGroupId(),
+ request.getGroupId())) {
+ continue;
+ }
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(sourceEntity.getSourceType());
+ request.setSourceId(sourceEntity.getId());
+ int id = sourceOperator.addDataAddTask(request, operator);
+ resultIdList.add(id);
+ }
+ LOGGER.info("success to add data add task info: {}, data add task
size: {}", request, resultIdList.size());
+ return resultIdList;
}
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index d60fa3bad5..4253747ee1 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -130,13 +130,10 @@ public class StreamSourceController {
sourceService.forceDelete(inlongGroupId, inlongStreamId,
LoginUserUtils.getLoginUser().getName()));
}
- @RequestMapping(value = "/source/addDataAddTask/{groupId}", method =
RequestMethod.POST)
+ @RequestMapping(value = "/source/addDataAddTask", method =
RequestMethod.POST)
@ApiOperation(value = "Add supplementary recording task for stream source")
- @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required
= true)
- public Response<List<Integer>> addSub(@PathVariable String groupId,
- @RequestBody List<DataAddTaskRequest> requestList) {
- return Response.success(
- sourceService.batchAddDataAddTask(groupId, requestList,
LoginUserUtils.getLoginUser().getName()));
+ public Response<List<Integer>> addSub(@RequestBody DataAddTaskRequest
request) {
+ return Response.success(sourceService.addDataAddTask(request,
LoginUserUtils.getLoginUser().getName()));
}
}