This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5abb311b14 [INLONG-11323][Manager] Modify the parameters of the data 
add tasks for file collection (#11324)
5abb311b14 is described below

commit 5abb311b1457486db45b714a545e0f28c658943a
Author: fuweng11 <[email protected]>
AuthorDate: Fri Oct 11 15:56:15 2024 +0800

    [INLONG-11323][Manager] Modify the parameters of the data add tasks for 
file collection (#11324)
---
 .../manager/pojo/source/DataAddTaskRequest.java    | 11 +++-
 .../service/source/AbstractSourceOperator.java     | 69 ++++++++++++----------
 .../service/source/StreamSourceService.java        | 11 +---
 .../service/source/StreamSourceServiceImpl.java    | 44 ++++++++------
 .../web/controller/StreamSourceController.java     |  9 +--
 5 files changed, 77 insertions(+), 67 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index d835c4ee37..fccc8c06ff 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -25,6 +25,8 @@ import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 
+import java.util.List;
+
 /**
  * Data add task information
  */
@@ -33,9 +35,16 @@ import javax.validation.constraints.NotBlank;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = 
"sourceType")
 public class DataAddTaskRequest {
 
-    @ApiModelProperty(value = "Source ID")
+    @ApiModelProperty(value = "Group Id")
+    @NotBlank(message = "inlongGroupId cannot be blank")
+    private String groupId;
+
+    @ApiModelProperty(value = "Source ID", hidden = true)
     private Integer sourceId;
 
+    @ApiModelProperty(value = "Agent ip List")
+    private List<String> agentIpList;
+
     @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
     @NotBlank(message = "sourceType cannot be blank")
     @Length(min = 1, max = 20, message = "length must be between 1 and 20")
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 5caf6d2473..885fd2a99e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -421,6 +421,42 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
             if (existEntity != null) {
                 agentTaskConfigEntity = 
CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true);
             }
+
+            LOGGER.debug("begin to get agent config info for {}", request);
+            Set<String> tagSet = new HashSet<>(16);
+            InlongClusterEntity agentClusterInfo = 
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
+                    ClusterType.AGENT);
+            if (agentClusterInfo == null) {
+                
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+                
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+                return;
+            }
+            String clusterTag = agentClusterInfo.getClusterTags();
+            AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
+                    .cluster(AgentConfigInfo.AgentClusterInfo.builder()
+                            .parentId(agentClusterInfo.getId())
+                            .clusterName(agentClusterInfo.getName())
+                            .build())
+                    .build();
+            if (StringUtils.isNotBlank(clusterTag)) {
+                
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
+                List<String> clusterTagList = new ArrayList<>(tagSet);
+                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                        .type(ClusterType.AGENT_ZK)
+                        .clusterTagList(clusterTagList)
+                        .build();
+                List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
+                if (CollectionUtils.isNotEmpty(agentZkCluster)) {
+                    agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
+                }
+            }
+
+            String jsonStr = GSON.toJson(agentConfigInfo);
+            String configMd5 = DigestUtils.md5Hex(jsonStr);
+            agentConfigInfo.setMd5(configMd5);
+            agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
+            
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
+
             List<StreamSourceEntity> normalSourceEntities = 
sourceMapper.selectByStatusAndCluster(
                     
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode)
                             .collect(Collectors.toList()),
@@ -443,7 +479,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
                         return cmdConfig;
                     }).collect(Collectors.toList());
             if (CollectionUtils.isEmpty(taskLists)) {
-                
agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId());
+                agentTaskConfigEntity.setTaskParams(null);
                 
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
                 return;
             }
@@ -461,37 +497,6 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
             
agentTaskConfigEntity.setClusterName(request.getInlongClusterName());
             
agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult));
 
-            LOGGER.debug("begin to get agent config info for {}", request);
-            Set<String> tagSet = new HashSet<>(16);
-            InlongGroupEntity groupEntity =
-                    
groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId());
-            String clusterTag = groupEntity.getInlongClusterTag();
-            InlongClusterEntity agentClusterInfo = 
clusterMapper.selectByNameAndType(request.getInlongClusterName(),
-                    ClusterType.AGENT);
-            AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder()
-                    .cluster(AgentConfigInfo.AgentClusterInfo.builder()
-                            .parentId(agentClusterInfo.getId())
-                            .clusterName(agentClusterInfo.getName())
-                            .build())
-                    .build();
-            if (StringUtils.isNotBlank(clusterTag)) {
-                
tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA)));
-                List<String> clusterTagList = new ArrayList<>(tagSet);
-                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                        .type(ClusterType.AGENT_ZK)
-                        .clusterTagList(clusterTagList)
-                        .build();
-                List<InlongClusterEntity> agentZkCluster = 
clusterMapper.selectByCondition(pageRequest);
-                if (CollectionUtils.isNotEmpty(agentZkCluster)) {
-                    agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl());
-                }
-            }
-
-            String jsonStr = GSON.toJson(agentConfigInfo);
-            String configMd5 = DigestUtils.md5Hex(jsonStr);
-            agentConfigInfo.setMd5(configMd5);
-            agentConfigInfo.setCode(AgentResponseCode.SUCCESS);
-            
agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo));
             agentClusterInfo.setModifier(operator);
             if (existEntity == null) {
                 agentTaskConfigEntity.setCreator(operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 9be7f06172..50b5150817 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -211,15 +211,6 @@ public interface StreamSourceService {
      * @param operator Operator's name.
      * @return source id after saving.
      */
-    Integer addDataAddTask(DataAddTaskRequest request, String operator);
+    List<Integer> addDataAddTask(DataAddTaskRequest request, String operator);
 
-    /**
-     * Batch Save the data add task information
-     *
-     * @param requestList Source request list.
-     * @param operator Operator's name.
-     * @return source id list after saving.
-     */
-    List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> 
requestList,
-            String operator);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 0241524dcf..de96386b25 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -68,6 +68,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -533,25 +534,32 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     }
 
     @Override
-    public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
+    public List<Integer> addDataAddTask(DataAddTaskRequest request, String 
operator) {
         LOGGER.info("begin to add data add task info: {}", request);
-        StreamSourceEntity entity = 
sourceMapper.selectById(request.getSourceId());
-        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(entity.getSourceType());
-        int id = sourceOperator.addDataAddTask(request, operator);
-        LOGGER.info("success to add data add task info: {}", request);
-        return id;
-    }
-
-    @Override
-    public List<Integer> batchAddDataAddTask(String groupId, 
List<DataAddTaskRequest> requestList,
-            String operator) {
-        List<Integer> result = new ArrayList<>();
-        String auditVersion = 
String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null));
-        for (DataAddTaskRequest request : requestList) {
-            request.setAuditVersion(auditVersion);
-            int id = addDataAddTask(request, operator);
-            result.add(id);
+        String auditVersion = 
String.valueOf(sourceMapper.selectDataAddTaskCount(request.getGroupId(), null));
+        request.setAuditVersion(auditVersion);
+        List<String> agentIpList = request.getAgentIpList();
+        List<StreamSourceEntity> entityList = new ArrayList<>();
+        List<Integer> resultIdList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(agentIpList)) {
+            entityList = sourceMapper.selectByRelatedId(request.getGroupId(), 
null, null);
+        } else {
+            for (String agentIp : agentIpList) {
+                List<StreamSourceEntity> sourceEntityList = 
sourceMapper.selectByAgentIp(agentIp);
+                entityList.addAll(sourceEntityList);
+            }
         }
-        return result;
+        for (StreamSourceEntity sourceEntity : entityList) {
+            if (sourceEntity.getTaskMapId() != null || 
!Objects.equals(sourceEntity.getInlongGroupId(),
+                    request.getGroupId())) {
+                continue;
+            }
+            StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(sourceEntity.getSourceType());
+            request.setSourceId(sourceEntity.getId());
+            int id = sourceOperator.addDataAddTask(request, operator);
+            resultIdList.add(id);
+        }
+        LOGGER.info("success to add data add task info: {}, data add task 
size: {}", request, resultIdList.size());
+        return resultIdList;
     }
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index d60fa3bad5..4253747ee1 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -130,13 +130,10 @@ public class StreamSourceController {
                 sourceService.forceDelete(inlongGroupId, inlongStreamId, 
LoginUserUtils.getLoginUser().getName()));
     }
 
-    @RequestMapping(value = "/source/addDataAddTask/{groupId}", method = 
RequestMethod.POST)
+    @RequestMapping(value = "/source/addDataAddTask", method = 
RequestMethod.POST)
     @ApiOperation(value = "Add supplementary recording task for stream source")
-    @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required 
= true)
-    public Response<List<Integer>> addSub(@PathVariable String groupId,
-            @RequestBody List<DataAddTaskRequest> requestList) {
-        return Response.success(
-                sourceService.batchAddDataAddTask(groupId, requestList, 
LoginUserUtils.getLoginUser().getName()));
+    public Response<List<Integer>> addSub(@RequestBody DataAddTaskRequest 
request) {
+        return Response.success(sourceService.addDataAddTask(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
 }

Reply via email to