This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a705be607 [INLONG-11487][Manager] Support adding data add tasks based 
on the source ID (#11488)
4a705be607 is described below

commit 4a705be6078ce8b8ce10fd1e172488262a0af3be
Author: fuweng11 <[email protected]>
AuthorDate: Thu Nov 14 11:29:05 2024 +0800

    [INLONG-11487][Manager] Support adding data add tasks based on the source 
ID (#11488)
---
 .../manager/pojo/source/DataAddTaskRequest.java       | 13 ++++++++++++-
 .../pojo/source/file/FileDataAddTaskRequest.java      |  6 ------
 .../inlong/manager/pojo/source/file/FileSource.java   |  8 ++++----
 .../manager/pojo/source/file/FileSourceDTO.java       |  8 ++++----
 .../manager/pojo/source/file/FileSourceRequest.java   |  9 ++++-----
 .../cluster/node/AgentClusterNodeInstallOperator.java |  5 +++++
 .../manager/service/cmd/CommandExecutorImpl.java      |  3 ++-
 .../service/source/StreamSourceServiceImpl.java       | 19 ++++++++++++++-----
 .../service/source/file/FileSourceOperator.java       |  8 +++++---
 9 files changed, 50 insertions(+), 29 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index fccc8c06ff..31d4801f11 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -24,6 +24,7 @@ import lombok.Data;
 import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 
 import java.util.List;
 
@@ -39,7 +40,7 @@ public class DataAddTaskRequest {
     @NotBlank(message = "inlongGroupId cannot be blank")
     private String groupId;
 
-    @ApiModelProperty(value = "Source ID", hidden = true)
+    @ApiModelProperty(value = "Source ID, When the source ID is not null, data 
add task entries based on the source ID")
     private Integer sourceId;
 
     @ApiModelProperty(value = "Agent ip List")
@@ -53,4 +54,14 @@ public class DataAddTaskRequest {
     @ApiModelProperty(value = "Audit version", hidden = true)
     private String auditVersion;
 
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty(value = "Increase Audit Version")
+    @NotNull(message = "IncreaseAuditVersion cannot be null")
+    private Boolean increaseAuditVersion;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
index 6cb5fe1e42..20bcf08e9f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
@@ -39,10 +39,4 @@ public class FileDataAddTaskRequest extends 
DataAddTaskRequest {
     @ApiModelProperty("filterStreams")
     private List<String> filterStreams;
 
-    @ApiModelProperty("Start time")
-    private Long startTime;
-
-    @ApiModelProperty("End time")
-    private Long endTime;
-
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index 6b3cc382db..1be07993a9 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -89,11 +89,11 @@ public class FileSource extends StreamSource {
     @ApiModelProperty("Whether retry")
     private Boolean retry;
 
-    @ApiModelProperty("Start time")
-    private Long startTime;
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
 
-    @ApiModelProperty("End time")
-    private Long endTime;
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
 
     @ApiModelProperty("filterStreams")
     private List<String> filterStreams;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index db4ba25c12..f23622d060 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -92,11 +92,11 @@ public class FileSourceDTO {
     @ApiModelProperty("Whether retry")
     private Boolean retry = false;
 
-    @ApiModelProperty("Start time")
-    private Long startTime = 0L;
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
 
-    @ApiModelProperty("End time")
-    private Long endTime = 0L;
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
 
     @ApiModelProperty(value = "Audit version")
     private String auditVersion;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index d0100ecd1b..777b231629 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -84,12 +84,11 @@ public class FileSourceRequest extends SourceRequest {
     @ApiModelProperty("Whether retry")
     private Boolean retry;
 
-    @ApiModelProperty("Start time")
-    private Long startTime;
-
-    @ApiModelProperty("End time")
-    private Long endTime;
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
 
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
     @ApiModelProperty("filterStreams")
     private List<String> filterStreams;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
index 49ee5dd669..4d8e19a9ac 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
@@ -59,6 +59,7 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
 
     public static final String INSTALLER_CONF_PATH = 
"/conf/installer.properties";
     public static final String INSTALLER_START_CMD = "/bin/installer.sh start";
+    public static final String CRONTAB_START_CMD = "/bin/crontab.sh";
     public static final String INSTALLER_RESTART_CMD = "/bin/installer.sh 
restart";
     public static final String INSTALLER_STOP_CMD = "/bin/installer.sh 
restart";
     public static final String AGENT_MANAGER_AUTH_SECRET_ID = 
"agent.manager.auth.secretId";
@@ -115,6 +116,8 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
             deployInstaller(request, operator);
             String startCmd = agentInstallPath + INSTALLER_START_CMD;
             commandExecutor.execRemote(request, startCmd);
+            String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
+            commandExecutor.execRemote(request, crontabStartCmd);
             
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
                     NodeStatus.INSTALL_SUCCESS.getStatus(), currentTime + 
InlongConstants.BLANK + "success to install");
         } catch (Exception e) {
@@ -149,6 +152,8 @@ public class AgentClusterNodeInstallOperator implements 
InlongClusterNodeInstall
             commandExecutor.cpDir(request, agentInstallTempPath + 
"/modules.json", agentInstallPath + "/conf");
             String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
             commandExecutor.execRemote(request, reStartCmd);
+            String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
+            commandExecutor.execRemote(request, crontabStartCmd);
             
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), 
NodeStatus.NORMAL.getStatus(),
                     currentTime + InlongConstants.BLANK + "success to 
reinstall");
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
index cb44988bbf..85356195b5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
@@ -147,7 +147,8 @@ public class CommandExecutorImpl implements CommandExecutor 
{
     @Override
     public CommandResult cpDir(AgentClusterNodeRequest clusterNodeRequest, 
String sourcePath, String targetPath)
             throws Exception {
-        return execRemote(clusterNodeRequest, "cp " + sourcePath + " " + 
targetPath);
+        return execRemote(clusterNodeRequest,
+                "if [ -e " + sourcePath + " ]; then cp " + sourcePath + " " + 
targetPath + "; fi");
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index e82d9d2aeb..324c996910 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -545,12 +545,21 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         List<String> agentIpList = request.getAgentIpList();
         List<StreamSourceEntity> entityList = new ArrayList<>();
         List<Integer> resultIdList = new ArrayList<>();
-        if (CollectionUtils.isEmpty(agentIpList)) {
-            entityList = sourceMapper.selectByRelatedId(request.getGroupId(), 
null, null);
+        if (request.getSourceId() != null) {
+            StreamSourceEntity entity = 
sourceMapper.selectById(request.getSourceId());
+            Preconditions.expectNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
+            entityList.add(entity);
         } else {
-            for (String agentIp : agentIpList) {
-                List<StreamSourceEntity> sourceEntityList = 
sourceMapper.selectByAgentIp(agentIp);
-                entityList.addAll(sourceEntityList);
+            if (agentIpList == null) {
+                throw new BusinessException("Agent ip list can not null");
+            }
+            if (CollectionUtils.isEmpty(agentIpList)) {
+                entityList = 
sourceMapper.selectByRelatedId(request.getGroupId(), null, null);
+            } else {
+                for (String agentIp : agentIpList) {
+                    List<StreamSourceEntity> sourceEntityList = 
sourceMapper.selectByAgentIp(agentIp);
+                    entityList.addAll(sourceEntityList);
+                }
             }
         }
         for (StreamSourceEntity sourceEntity : entityList) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index fa10f8081f..faf797dc7a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -114,10 +114,12 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
         try {
             List<StreamSourceEntity> dataAddTaskList = 
sourceMapper.selectByTaskMapId(sourceEntity.getId());
             FileSourceDTO dto = 
FileSourceDTO.getFromJson(sourceEntity.getExtParams());
-            dto.setStartTime(sourceRequest.getStartTime());
-            dto.setEndTime(sourceRequest.getEndTime());
+            dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
+            dto.setDataTimeTo(sourceRequest.getDataTimeTo());
             dto.setRetry(true);
-            dto.setAuditVersion(request.getAuditVersion());
+            if (request.getIncreaseAuditVersion()) {
+                dto.setAuditVersion(request.getAuditVersion());
+            }
             dto.setFilterStreams(sourceRequest.getFilterStreams());
             StreamSourceEntity dataAddTaskEntity =
                     CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);

Reply via email to