This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6645261800 [INLONG-10245][Manager] Support setting audit version for
file collection (#10246)
6645261800 is described below
commit 66452618003f8c7c2294882528fb578cb3086172
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 21 18:21:36 2024 +0800
[INLONG-10245][Manager] Support setting audit version for file collection
(#10246)
---
.../apache/inlong/common/pojo/agent/DataConfig.java | 1 +
.../manager/dao/mapper/StreamSourceEntityMapper.java | 5 +++++
.../resources/mappers/StreamSourceEntityMapper.xml | 13 +++++++++++++
.../manager/pojo/source/DataAddTaskRequest.java | 3 +++
.../inlong/manager/pojo/source/SourceRequest.java | 3 +++
.../inlong/manager/pojo/source/StreamSource.java | 3 +++
.../manager/pojo/source/file/FileSourceDTO.java | 3 +++
.../manager/service/core/impl/AgentServiceImpl.java | 20 ++++++++------------
.../manager/service/source/StreamSourceService.java | 9 +++++++++
.../service/source/StreamSourceServiceImpl.java | 13 +++++++++++++
.../web/controller/StreamSourceController.java | 12 +++++++++---
11 files changed, 70 insertions(+), 15 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index b214909aa4..7c2eff93e2 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -50,6 +50,7 @@ public class DataConfig {
private Integer state;
private String predefinedFields;
private String timeZone;
+ private String auditVersion;
private String extParams;
/**
* The task version.
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 53d28bbbab..7340d182c3 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -62,6 +62,11 @@ public interface StreamSourceEntityMapper {
*/
int selectCount(@Param("groupId") String groupId, @Param("streamId")
String streamId);
+ /**
+ * According to the inlong group id and inlong stream id, query the number
of data add task
+ */
+ int selectDataAddTaskCount(@Param("groupId") String groupId,
@Param("streamId") String streamId);
+
/**
* Paging query source list based on conditions
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c1101ed2f6..9008bcb7a1 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -120,6 +120,19 @@
</if>
</where>
</select>
+ <select id="selectDataAddTaskCount" resultType="java.lang.Integer">
+ select count(1)
+ from stream_source
+ <where>
+ <if test="groupId != null and groupId != ''">
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ </if>
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ and task_map_id is not NULL
+ </where>
+ </select>
<select id="selectByCondition"
parameterType="org.apache.inlong.manager.pojo.source.SourcePageRequest"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index 521551825a..d835c4ee37 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -41,4 +41,7 @@ public class DataAddTaskRequest {
@Length(min = 1, max = 20, message = "length must be between 1 and 20")
private String sourceType;
+ @ApiModelProperty(value = "Audit version", hidden = true)
+ private String auditVersion;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 6a31f9039f..94c23888a1 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -111,6 +111,9 @@ public class SourceRequest {
@NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
private Integer version;
+ @ApiModelProperty(value = "Audit version")
+ private String auditVersion;
+
@ApiModelProperty("Field list, only support when inlong group in light
weight mode")
private List<StreamField> fieldList;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index cc56ecb6a7..4f99b32e65 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -114,6 +114,9 @@ public abstract class StreamSource extends StreamNode {
@ApiModelProperty("Null if not a data add task")
private Integer taskMapId;
+ @ApiModelProperty(value = "Audit version")
+ private String auditVersion;
+
@ApiModelProperty("Data add task information of existing agents")
private List<DataAddTaskDTO> dataAddTaskList;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index a9a20d7fb4..98d2ac046c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -98,6 +98,9 @@ public class FileSourceDTO {
@ApiModelProperty("End time")
private Long endTime = 0L;
+ @ApiModelProperty(value = "Audit version")
+ private String auditVersion;
+
@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0deade816e..52268e0a41 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -673,10 +673,15 @@ public class AgentServiceImpl implements AgentService {
? TaskStateEnum.RUNNING.getType()
: TaskStateEnum.FROZEN.getType());
dataConfig.setSyncSend(streamEntity.getSyncSend());
- if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())
- &&
StringUtils.isNotBlank(streamEntity.getDataSeparator())) {
+ if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
- extParams = getExtParams(extParams, dataSeparator);
+ FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams,
FileSourceDTO.class);
+ if (Objects.nonNull(fileSourceDTO)) {
+ fileSourceDTO.setDataSeparator(dataSeparator);
+
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+ extParams = JsonUtils.toJsonString(fileSourceDTO);
+ }
}
InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
// Processing extParams
@@ -744,15 +749,6 @@ public class AgentServiceImpl implements AgentService {
return dataConfig;
}
- private String getExtParams(String extParams, String dataSeparator) {
- FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams,
FileSourceDTO.class);
- if (Objects.nonNull(fileSourceDTO)) {
- fileSourceDTO.setDataSeparator(dataSeparator);
- return JsonUtils.toJsonString(fileSourceDTO);
- }
- return extParams;
- }
-
/**
* Get the Task type from the stream source entity.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 879ca5e012..0a2cfa9239 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -249,4 +249,13 @@ public interface StreamSourceService {
*/
Integer addDataAddTask(DataAddTaskRequest request, String operator);
+ /**
+ * Batch Save the data add task information
+ *
+ * @param requestList Source request list.
+ * @param operator Operator's name.
+ * @return source id list after saving.
+ */
+ List<Integer> batchAddDataAddTask(String groupId, String streamId,
List<DataAddTaskRequest> requestList,
+ String operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index c252a6a00c..ec4d70f076 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -665,4 +665,17 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
LOGGER.info("success to add data add task info: {}", request);
return id;
}
+
+ @Override
+ public List<Integer> batchAddDataAddTask(String groupId, String streamId,
List<DataAddTaskRequest> requestList,
+ String operator) {
+ List<Integer> result = new ArrayList<>();
+ String auditVersion =
String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, streamId));
+ for (DataAddTaskRequest request : requestList) {
+ request.setAuditVersion(auditVersion);
+ int id = addDataAddTask(request, operator);
+ result.add(id);
+ }
+ return result;
+ }
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index e410af5c1e..1f62edd358 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -130,10 +130,16 @@ public class StreamSourceController {
sourceService.forceDelete(inlongGroupId, inlongStreamId,
LoginUserUtils.getLoginUser().getName()));
}
- @RequestMapping(value = "/source/addDataAddTask", method =
RequestMethod.POST)
+ @RequestMapping(value = "/source/addDataAddTask/{groupId}/{streamId}",
method = RequestMethod.POST)
@ApiOperation(value = "Add supplementary recording task for stream source")
- public Response<Integer> addSub(@RequestBody DataAddTaskRequest request) {
- return Response.success(sourceService.addDataAddTask(request,
LoginUserUtils.getLoginUser().getName()));
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class,
required = true),
+ @ApiImplicitParam(name = "streamId", dataTypeClass = String.class,
required = true)
+ })
+ public Response<List<Integer>> addSub(@PathVariable String groupId,
@PathVariable String streamId,
+ @RequestBody List<DataAddTaskRequest> requestList) {
+ return Response.success(sourceService.batchAddDataAddTask(groupId,
streamId, requestList,
+ LoginUserUtils.getLoginUser().getName()));
}
}