This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6645261800 [INLONG-10245][Manager] Support setting audit version for 
file collection (#10246)
6645261800 is described below

commit 66452618003f8c7c2294882528fb578cb3086172
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 21 18:21:36 2024 +0800

    [INLONG-10245][Manager] Support setting audit version for file collection 
(#10246)
---
 .../apache/inlong/common/pojo/agent/DataConfig.java  |  1 +
 .../manager/dao/mapper/StreamSourceEntityMapper.java |  5 +++++
 .../resources/mappers/StreamSourceEntityMapper.xml   | 13 +++++++++++++
 .../manager/pojo/source/DataAddTaskRequest.java      |  3 +++
 .../inlong/manager/pojo/source/SourceRequest.java    |  3 +++
 .../inlong/manager/pojo/source/StreamSource.java     |  3 +++
 .../manager/pojo/source/file/FileSourceDTO.java      |  3 +++
 .../manager/service/core/impl/AgentServiceImpl.java  | 20 ++++++++------------
 .../manager/service/source/StreamSourceService.java  |  9 +++++++++
 .../service/source/StreamSourceServiceImpl.java      | 13 +++++++++++++
 .../web/controller/StreamSourceController.java       | 12 +++++++++---
 11 files changed, 70 insertions(+), 15 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index b214909aa4..7c2eff93e2 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -50,6 +50,7 @@ public class DataConfig {
     private Integer state;
     private String predefinedFields;
     private String timeZone;
+    private String auditVersion;
     private String extParams;
     /**
      * The task version.
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 53d28bbbab..7340d182c3 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -62,6 +62,11 @@ public interface StreamSourceEntityMapper {
      */
     int selectCount(@Param("groupId") String groupId, @Param("streamId") 
String streamId);
 
+    /**
+     * According to the inlong group id and inlong stream id, query the number 
of data add task
+     */
+    int selectDataAddTaskCount(@Param("groupId") String groupId, 
@Param("streamId") String streamId);
+
     /**
      * Paging query source list based on conditions
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c1101ed2f6..9008bcb7a1 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -120,6 +120,19 @@
             </if>
         </where>
     </select>
+    <select id="selectDataAddTaskCount" resultType="java.lang.Integer">
+        select count(1)
+        from stream_source
+        <where>
+            <if test="groupId != null and groupId != ''">
+                and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+            </if>
+            <if test="streamId != null and streamId != ''">
+                and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+            </if>
+            and task_map_id is not NULL
+        </where>
+    </select>
     <select id="selectByCondition"
             
parameterType="org.apache.inlong.manager.pojo.source.SourcePageRequest"
             
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index 521551825a..d835c4ee37 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -41,4 +41,7 @@ public class DataAddTaskRequest {
     @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String sourceType;
 
+    @ApiModelProperty(value = "Audit version", hidden = true)
+    private String auditVersion;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 6a31f9039f..94c23888a1 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -111,6 +111,9 @@ public class SourceRequest {
     @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
     private Integer version;
 
+    @ApiModelProperty(value = "Audit version")
+    private String auditVersion;
+
     @ApiModelProperty("Field list, only support when inlong group in light 
weight mode")
     private List<StreamField> fieldList;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index cc56ecb6a7..4f99b32e65 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -114,6 +114,9 @@ public abstract class StreamSource extends StreamNode {
     @ApiModelProperty("Null if not a data add task")
     private Integer taskMapId;
 
+    @ApiModelProperty(value = "Audit version")
+    private String auditVersion;
+
     @ApiModelProperty("Data add task information of existing agents")
     private List<DataAddTaskDTO> dataAddTaskList;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index a9a20d7fb4..98d2ac046c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -98,6 +98,9 @@ public class FileSourceDTO {
     @ApiModelProperty("End time")
     private Long endTime = 0L;
 
+    @ApiModelProperty(value = "Audit version")
+    private String auditVersion;
+
     @ApiModelProperty("Metadata filters by label, special parameters for K8S")
     private Map<String, String> filterMetaByLabels;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0deade816e..52268e0a41 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -673,10 +673,15 @@ public class AgentServiceImpl implements AgentService {
                             ? TaskStateEnum.RUNNING.getType()
                             : TaskStateEnum.FROZEN.getType());
             dataConfig.setSyncSend(streamEntity.getSyncSend());
-            if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())
-                    && 
StringUtils.isNotBlank(streamEntity.getDataSeparator())) {
+            if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
                 String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
-                extParams = getExtParams(extParams, dataSeparator);
+                FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, 
FileSourceDTO.class);
+                if (Objects.nonNull(fileSourceDTO)) {
+                    fileSourceDTO.setDataSeparator(dataSeparator);
+                    
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+                    
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+                    extParams = JsonUtils.toJsonString(fileSourceDTO);
+                }
             }
             InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
             // Processing extParams
@@ -744,15 +749,6 @@ public class AgentServiceImpl implements AgentService {
         return dataConfig;
     }
 
-    private String getExtParams(String extParams, String dataSeparator) {
-        FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, 
FileSourceDTO.class);
-        if (Objects.nonNull(fileSourceDTO)) {
-            fileSourceDTO.setDataSeparator(dataSeparator);
-            return JsonUtils.toJsonString(fileSourceDTO);
-        }
-        return extParams;
-    }
-
     /**
      * Get the Task type from the stream source entity.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 879ca5e012..0a2cfa9239 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -249,4 +249,13 @@ public interface StreamSourceService {
      */
     Integer addDataAddTask(DataAddTaskRequest request, String operator);
 
+    /**
+     * Batch Save the data add task information
+     *
+     * @param requestList Source request list.
+     * @param operator Operator's name.
+     * @return source id list after saving.
+     */
+    List<Integer> batchAddDataAddTask(String groupId, String streamId, 
List<DataAddTaskRequest> requestList,
+            String operator);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index c252a6a00c..ec4d70f076 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -665,4 +665,17 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         LOGGER.info("success to add data add task info: {}", request);
         return id;
     }
+
+    @Override
+    public List<Integer> batchAddDataAddTask(String groupId, String streamId, 
List<DataAddTaskRequest> requestList,
+            String operator) {
+        List<Integer> result = new ArrayList<>();
+        String auditVersion = 
String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, streamId));
+        for (DataAddTaskRequest request : requestList) {
+            request.setAuditVersion(auditVersion);
+            int id = addDataAddTask(request, operator);
+            result.add(id);
+        }
+        return result;
+    }
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index e410af5c1e..1f62edd358 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -130,10 +130,16 @@ public class StreamSourceController {
                 sourceService.forceDelete(inlongGroupId, inlongStreamId, 
LoginUserUtils.getLoginUser().getName()));
     }
 
-    @RequestMapping(value = "/source/addDataAddTask", method = 
RequestMethod.POST)
+    @RequestMapping(value = "/source/addDataAddTask/{groupId}/{streamId}", 
method = RequestMethod.POST)
     @ApiOperation(value = "Add supplementary recording task for stream source")
-    public Response<Integer> addSub(@RequestBody DataAddTaskRequest request) {
-        return Response.success(sourceService.addDataAddTask(request, 
LoginUserUtils.getLoginUser().getName()));
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, 
required = true)
+    })
+    public Response<List<Integer>> addSub(@PathVariable String groupId, 
@PathVariable String streamId,
+            @RequestBody List<DataAddTaskRequest> requestList) {
+        return Response.success(sourceService.batchAddDataAddTask(groupId, 
streamId, requestList,
+                LoginUserUtils.getLoginUser().getName()));
     }
 
 }

Reply via email to