This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 94728b6676 [INLONG-8995][Manager] Add an interface for querying used
task information for agent (#8996)
94728b6676 is described below
commit 94728b66760364d10cf38945fd6b1d3aad828de3
Author: fuweng11 <[email protected]>
AuthorDate: Wed Sep 27 20:50:02 2023 +0800
[INLONG-8995][Manager] Add an interface for querying used task information
for agent (#8996)
---
.../inlong/common/pojo/agent/DataConfig.java | 1 +
.../inlong/manager/common/enums/SourceStatus.java | 10 ++++
.../manager/pojo/source/file/FileSource.java | 12 +++++
.../manager/pojo/source/file/FileSourceDTO.java | 12 +++++
.../pojo/source/file/FileSourceRequest.java | 12 +++++
.../inlong/manager/service/core/AgentService.java | 2 +
.../service/core/impl/AgentServiceImpl.java | 59 ++++++++++++++++++++++
.../web/controller/openapi/AgentController.java | 6 +++
8 files changed, 114 insertions(+)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index e71f132b58..975f74d128 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -47,6 +47,7 @@ public class DataConfig {
private String snapshot;
private Integer syncSend;
private String syncPartitionKey;
+ private Integer state;
private String extParams;
/**
* The task version.
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 306a9e60cc..35c9f1aac4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -84,6 +84,16 @@ public enum SourceStatus {
TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);
+ public static final Set<SourceStatus> NORMAL_STATUS_SET = Sets.newHashSet(
+ SOURCE_NORMAL, TO_BE_ISSUED_ADD, TO_BE_ISSUED_RETRY,
+ TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK,
+ TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
+ BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_ACTIVE,
+ BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP);
+
+ public static final Set<SourceStatus> STOP_STATUS_SET =
Sets.newHashSet(SOURCE_STOP, SOURCE_FAILED,
+ TO_BE_ISSUED_STOP, BEEN_ISSUED_STOP);
+
private static final Map<SourceStatus, Set<SourceStatus>>
SOURCE_STATE_AUTOMATON = Maps.newHashMap();
static {
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index 51d7d01d3b..7036473798 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -77,6 +77,18 @@ public class FileSource extends StreamSource {
+ " Json format, set this parameter to json ")
private String dataContentStyle;
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty("Start time")
+ private Long startTime;
+
+ @ApiModelProperty("End time")
+ private Long endTime;
+
@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index f3a52f4f85..19b7018b2a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -80,6 +80,18 @@ public class FileSourceDTO {
@ApiModelProperty("Column separator of data source ")
private String dataSeparator;
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit = "D";
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry = false;
+
+ @ApiModelProperty("Start time")
+ private Long startTime = 0L;
+
+ @ApiModelProperty("End time")
+ private Long endTime = 0L;
+
@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index 822655813c..c27ec4e761 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -72,6 +72,18 @@ public class FileSourceRequest extends SourceRequest {
+ " Json format, set this parameter to json ")
private String dataContentStyle;
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty("Start time")
+ private Long startTime;
+
+ @ApiModelProperty("End time")
+ private Long endTime;
+
@ApiModelProperty("Metadata filters by label, special parameters for K8S")
private Map<String, String> filterMetaByLabels;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index 099ebca720..2fd782dbe0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -50,6 +50,8 @@ public interface AgentService {
*/
TaskResult getTaskResult(TaskRequest request);
+ TaskResult getExistTaskConfig(TaskRequest request);
+
/**
* Divide the agent into different groups, which collect different stream
source tasks.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index b20d76a8c0..f8724045ac 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -58,10 +58,13 @@ import
org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
+import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -119,6 +122,10 @@ public class AgentServiceImpl implements AgentService {
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
new CallerRunsPolicy());
+
+ @Getter
+ private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;
+
@Value("${source.update.enabled:false}")
private Boolean updateTaskTimeoutEnabled;
@Value("${source.update.before.seconds:60}")
@@ -149,6 +156,14 @@ public class AgentServiceImpl implements AgentService {
*/
@PostConstruct
private void startHeartbeatTask() {
+
+ // The expiry time of cluster info cache must be greater than
taskCache cache
+ // because the eviction handler needs to query cluster info cache
+ long expireTime = 10 * 5;
+ taskCache = Caffeine.newBuilder()
+ .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
+ .build(this::fetchTask);
+
if (updateTaskTimeoutEnabled) {
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("scheduled-source-timeout-%d")
@@ -267,6 +282,32 @@ public class AgentServiceImpl implements AgentService {
return
TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
}
+ @Override
+ public TaskResult getExistTaskConfig(TaskRequest request) {
+ LOGGER.debug("begin to get all exist task by request={}", request);
+ // Query pending special commands
+ List<DataConfig> runningTaskConfig = Lists.newArrayList();
+ List<StreamSourceEntity> sourceEntities = taskCache.get(request);
+ try {
+ List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+ if (CollectionUtils.isEmpty(sourceEntities)) {
+ return
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+ }
+ for (StreamSourceEntity sourceEntity : sourceEntities) {
+ int op = getOp(sourceEntity.getStatus());
+ DataConfig dataConfig = getDataConfig(sourceEntity, op);
+ runningTaskConfig.add(dataConfig);
+ }
+ TaskResult taskResult =
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+
+ return taskResult;
+ } catch (Exception e) {
+ LOGGER.error("get all exist task failed:", e);
+ throw new BusinessException("get all exist task failed:" +
e.getMessage());
+ }
+
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class, isolation =
Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
@@ -552,6 +593,8 @@ public class AgentServiceImpl implements AgentService {
InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
String extParams = entity.getExtParams();
if (groupEntity != null && streamEntity != null) {
+ dataConfig.setState(
+
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
? 1 : 0);
dataConfig.setSyncSend(streamEntity.getSyncSend());
if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
String dataSeparator = streamEntity.getDataSeparator();
@@ -683,4 +726,20 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
+ private List<StreamSourceEntity> fetchTask(TaskRequest request) {
+ final String clusterName = request.getClusterName();
+ final String ip = request.getAgentIp();
+ final String uuid = request.getUuid();
+ List<StreamSourceEntity> normalSourceEntities =
sourceMapper.selectByStatusAndCluster(
+
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
+ clusterName, ip, uuid);
+ List<StreamSourceEntity> taskLists = new
ArrayList<>(normalSourceEntities);
+ List<StreamSourceEntity> stopSourceEntities =
sourceMapper.selectByStatusAndCluster(
+
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
+ clusterName, ip, uuid);
+ taskLists.addAll(stopSourceEntities);
+ LOGGER.debug("success to add task : {}", taskLists.size());
+ return taskLists;
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index cded7e3962..77011bd2e1 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -70,6 +70,12 @@ public class AgentController {
return Response.success(agentService.getTaskResult(request));
}
+ @PostMapping("/agent/getExistTaskConfig")
+ @ApiOperation(value = "Get all exist task config")
+ public Response<TaskResult> getExistTaskConfig(@RequestBody TaskRequest
request) {
+ return Response.success(agentService.getExistTaskConfig(request));
+ }
+
@PostMapping("/agent/bindGroup")
@ApiOperation(value = "Divide the agent into different groups, which
collect different stream source tasks.")
public Response<Boolean> bindGroup(@RequestBody
AgentClusterNodeBindGroupRequest request) {