This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4a705be607 [INLONG-11487][Manager] Support adding data add tasks based
on the source ID (#11488)
4a705be607 is described below
commit 4a705be6078ce8b8ce10fd1e172488262a0af3be
Author: fuweng11 <[email protected]>
AuthorDate: Thu Nov 14 11:29:05 2024 +0800
[INLONG-11487][Manager] Support adding data add tasks based on the source
ID (#11488)
---
.../manager/pojo/source/DataAddTaskRequest.java | 13 ++++++++++++-
.../pojo/source/file/FileDataAddTaskRequest.java | 6 ------
.../inlong/manager/pojo/source/file/FileSource.java | 8 ++++----
.../manager/pojo/source/file/FileSourceDTO.java | 8 ++++----
.../manager/pojo/source/file/FileSourceRequest.java | 9 ++++-----
.../cluster/node/AgentClusterNodeInstallOperator.java | 5 +++++
.../manager/service/cmd/CommandExecutorImpl.java | 3 ++-
.../service/source/StreamSourceServiceImpl.java | 19 ++++++++++++++-----
.../service/source/file/FileSourceOperator.java | 8 +++++---
9 files changed, 50 insertions(+), 29 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
index fccc8c06ff..31d4801f11 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -24,6 +24,7 @@ import lombok.Data;
import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import java.util.List;
@@ -39,7 +40,7 @@ public class DataAddTaskRequest {
@NotBlank(message = "inlongGroupId cannot be blank")
private String groupId;
- @ApiModelProperty(value = "Source ID", hidden = true)
+ @ApiModelProperty(value = "Source ID, When the source ID is not null, data
add task entries based on the source ID")
private Integer sourceId;
@ApiModelProperty(value = "Agent ip List")
@@ -53,4 +54,14 @@ public class DataAddTaskRequest {
@ApiModelProperty(value = "Audit version", hidden = true)
private String auditVersion;
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty(value = "Increase Audit Version")
+ @NotNull(message = "IncreaseAuditVersion cannot be null")
+ private Boolean increaseAuditVersion;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
index 6cb5fe1e42..20bcf08e9f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
@@ -39,10 +39,4 @@ public class FileDataAddTaskRequest extends
DataAddTaskRequest {
@ApiModelProperty("filterStreams")
private List<String> filterStreams;
- @ApiModelProperty("Start time")
- private Long startTime;
-
- @ApiModelProperty("End time")
- private Long endTime;
-
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index 6b3cc382db..1be07993a9 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -89,11 +89,11 @@ public class FileSource extends StreamSource {
@ApiModelProperty("Whether retry")
private Boolean retry;
- @ApiModelProperty("Start time")
- private Long startTime;
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
- @ApiModelProperty("End time")
- private Long endTime;
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
@ApiModelProperty("filterStreams")
private List<String> filterStreams;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index db4ba25c12..f23622d060 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -92,11 +92,11 @@ public class FileSourceDTO {
@ApiModelProperty("Whether retry")
private Boolean retry = false;
- @ApiModelProperty("Start time")
- private Long startTime = 0L;
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
- @ApiModelProperty("End time")
- private Long endTime = 0L;
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
@ApiModelProperty(value = "Audit version")
private String auditVersion;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index d0100ecd1b..777b231629 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -84,12 +84,11 @@ public class FileSourceRequest extends SourceRequest {
@ApiModelProperty("Whether retry")
private Boolean retry;
- @ApiModelProperty("Start time")
- private Long startTime;
-
- @ApiModelProperty("End time")
- private Long endTime;
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
@ApiModelProperty("filterStreams")
private List<String> filterStreams;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
index 49ee5dd669..4d8e19a9ac 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java
@@ -59,6 +59,7 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
public static final String INSTALLER_CONF_PATH =
"/conf/installer.properties";
public static final String INSTALLER_START_CMD = "/bin/installer.sh start";
+ public static final String CRONTAB_START_CMD = "/bin/crontab.sh";
public static final String INSTALLER_RESTART_CMD = "/bin/installer.sh
restart";
public static final String INSTALLER_STOP_CMD = "/bin/installer.sh
restart";
public static final String AGENT_MANAGER_AUTH_SECRET_ID =
"agent.manager.auth.secretId";
@@ -115,6 +116,8 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
deployInstaller(request, operator);
String startCmd = agentInstallPath + INSTALLER_START_CMD;
commandExecutor.execRemote(request, startCmd);
+ String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
+ commandExecutor.execRemote(request, crontabStartCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALL_SUCCESS.getStatus(), currentTime +
InlongConstants.BLANK + "success to install");
} catch (Exception e) {
@@ -149,6 +152,8 @@ public class AgentClusterNodeInstallOperator implements
InlongClusterNodeInstall
commandExecutor.cpDir(request, agentInstallTempPath +
"/modules.json", agentInstallPath + "/conf");
String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
commandExecutor.execRemote(request, reStartCmd);
+ String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
+ commandExecutor.execRemote(request, crontabStartCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.NORMAL.getStatus(),
currentTime + InlongConstants.BLANK + "success to
reinstall");
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
index cb44988bbf..85356195b5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java
@@ -147,7 +147,8 @@ public class CommandExecutorImpl implements CommandExecutor
{
@Override
public CommandResult cpDir(AgentClusterNodeRequest clusterNodeRequest,
String sourcePath, String targetPath)
throws Exception {
- return execRemote(clusterNodeRequest, "cp " + sourcePath + " " +
targetPath);
+ return execRemote(clusterNodeRequest,
+ "if [ -e " + sourcePath + " ]; then cp " + sourcePath + " " +
targetPath + "; fi");
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index e82d9d2aeb..324c996910 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -545,12 +545,21 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
List<String> agentIpList = request.getAgentIpList();
List<StreamSourceEntity> entityList = new ArrayList<>();
List<Integer> resultIdList = new ArrayList<>();
- if (CollectionUtils.isEmpty(agentIpList)) {
- entityList = sourceMapper.selectByRelatedId(request.getGroupId(),
null, null);
+ if (request.getSourceId() != null) {
+ StreamSourceEntity entity =
sourceMapper.selectById(request.getSourceId());
+ Preconditions.expectNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
+ entityList.add(entity);
} else {
- for (String agentIp : agentIpList) {
- List<StreamSourceEntity> sourceEntityList =
sourceMapper.selectByAgentIp(agentIp);
- entityList.addAll(sourceEntityList);
+ if (agentIpList == null) {
+ throw new BusinessException("Agent ip list can not null");
+ }
+ if (CollectionUtils.isEmpty(agentIpList)) {
+ entityList =
sourceMapper.selectByRelatedId(request.getGroupId(), null, null);
+ } else {
+ for (String agentIp : agentIpList) {
+ List<StreamSourceEntity> sourceEntityList =
sourceMapper.selectByAgentIp(agentIp);
+ entityList.addAll(sourceEntityList);
+ }
}
}
for (StreamSourceEntity sourceEntity : entityList) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index fa10f8081f..faf797dc7a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -114,10 +114,12 @@ public class FileSourceOperator extends
AbstractSourceOperator {
try {
List<StreamSourceEntity> dataAddTaskList =
sourceMapper.selectByTaskMapId(sourceEntity.getId());
FileSourceDTO dto =
FileSourceDTO.getFromJson(sourceEntity.getExtParams());
- dto.setStartTime(sourceRequest.getStartTime());
- dto.setEndTime(sourceRequest.getEndTime());
+ dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
+ dto.setDataTimeTo(sourceRequest.getDataTimeTo());
dto.setRetry(true);
- dto.setAuditVersion(request.getAuditVersion());
+ if (request.getIncreaseAuditVersion()) {
+ dto.setAuditVersion(request.getAuditVersion());
+ }
dto.setFilterStreams(sourceRequest.getFilterStreams());
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);