This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 94728b6676 [INLONG-8995][Manager] Add an interface for querying used 
task information for agent (#8996)
94728b6676 is described below

commit 94728b66760364d10cf38945fd6b1d3aad828de3
Author: fuweng11 <[email protected]>
AuthorDate: Wed Sep 27 20:50:02 2023 +0800

    [INLONG-8995][Manager] Add an interface for querying used task information 
for agent (#8996)
---
 .../inlong/common/pojo/agent/DataConfig.java       |  1 +
 .../inlong/manager/common/enums/SourceStatus.java  | 10 ++++
 .../manager/pojo/source/file/FileSource.java       | 12 +++++
 .../manager/pojo/source/file/FileSourceDTO.java    | 12 +++++
 .../pojo/source/file/FileSourceRequest.java        | 12 +++++
 .../inlong/manager/service/core/AgentService.java  |  2 +
 .../service/core/impl/AgentServiceImpl.java        | 59 ++++++++++++++++++++++
 .../web/controller/openapi/AgentController.java    |  6 +++
 8 files changed, 114 insertions(+)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index e71f132b58..975f74d128 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -47,6 +47,7 @@ public class DataConfig {
     private String snapshot;
     private Integer syncSend;
     private String syncPartitionKey;
+    private Integer state;
     private String extParams;
     /**
      * The task version.
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 306a9e60cc..35c9f1aac4 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -84,6 +84,16 @@ public enum SourceStatus {
             TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_STOP, TO_BE_ISSUED_ACTIVE,
             TO_BE_ISSUED_CHECK, TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP);
 
+    public static final Set<SourceStatus> NORMAL_STATUS_SET = Sets.newHashSet(
+            SOURCE_NORMAL, TO_BE_ISSUED_ADD, TO_BE_ISSUED_RETRY,
+            TO_BE_ISSUED_BACKTRACK, TO_BE_ISSUED_ACTIVE, TO_BE_ISSUED_CHECK,
+            TO_BE_ISSUED_REDO_METRIC, TO_BE_ISSUED_MAKEUP, BEEN_ISSUED_ADD,
+            BEEN_ISSUED_RETRY, BEEN_ISSUED_BACKTRACK, BEEN_ISSUED_ACTIVE,
+            BEEN_ISSUED_CHECK, BEEN_ISSUED_REDO_METRIC, BEEN_ISSUED_MAKEUP);
+
+    public static final Set<SourceStatus> STOP_STATUS_SET = 
Sets.newHashSet(SOURCE_STOP, SOURCE_FAILED,
+            TO_BE_ISSUED_STOP, BEEN_ISSUED_STOP);
+
     private static final Map<SourceStatus, Set<SourceStatus>> 
SOURCE_STATE_AUTOMATON = Maps.newHashMap();
 
     static {
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
index 51d7d01d3b..7036473798 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java
@@ -77,6 +77,18 @@ public class FileSource extends StreamSource {
             + "         Json format, set this parameter to json ")
     private String dataContentStyle;
 
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty("Start time")
+    private Long startTime;
+
+    @ApiModelProperty("End time")
+    private Long endTime;
+
     @ApiModelProperty("Metadata filters by label, special parameters for K8S")
     private Map<String, String> filterMetaByLabels;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
index f3a52f4f85..19b7018b2a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java
@@ -80,6 +80,18 @@ public class FileSourceDTO {
     @ApiModelProperty("Column separator of data source ")
     private String dataSeparator;
 
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit = "D";
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry = false;
+
+    @ApiModelProperty("Start time")
+    private Long startTime = 0L;
+
+    @ApiModelProperty("End time")
+    private Long endTime = 0L;
+
     @ApiModelProperty("Metadata filters by label, special parameters for K8S")
     private Map<String, String> filterMetaByLabels;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
index 822655813c..c27ec4e761 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java
@@ -72,6 +72,18 @@ public class FileSourceRequest extends SourceRequest {
             + "         Json format, set this parameter to json ")
     private String dataContentStyle;
 
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty("Start time")
+    private Long startTime;
+
+    @ApiModelProperty("End time")
+    private Long endTime;
+
     @ApiModelProperty("Metadata filters by label, special parameters for K8S")
     private Map<String, String> filterMetaByLabels;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index 099ebca720..2fd782dbe0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -50,6 +50,8 @@ public interface AgentService {
      */
     TaskResult getTaskResult(TaskRequest request);
 
+    TaskResult getExistTaskConfig(TaskRequest request);
+
     /**
      * Divide the agent into different groups, which collect different stream 
source tasks.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index b20d76a8c0..f8724045ac 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -58,10 +58,13 @@ import 
org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
+import lombok.Getter;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -119,6 +122,10 @@ public class AgentServiceImpl implements AgentService {
             new ArrayBlockingQueue<>(100),
             new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
             new CallerRunsPolicy());
+
+    @Getter
+    private LoadingCache<TaskRequest, List<StreamSourceEntity>> taskCache;
+
     @Value("${source.update.enabled:false}")
     private Boolean updateTaskTimeoutEnabled;
     @Value("${source.update.before.seconds:60}")
@@ -149,6 +156,14 @@ public class AgentServiceImpl implements AgentService {
      */
     @PostConstruct
     private void startHeartbeatTask() {
+
+        // The expiry time of cluster info cache must be greater than 
taskCache cache
+        // because the eviction handler needs to query cluster info cache
+        long expireTime = 10 * 5;
+        taskCache = Caffeine.newBuilder()
+                .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
+                .build(this::fetchTask);
+
         if (updateTaskTimeoutEnabled) {
             ThreadFactory factory = new ThreadFactoryBuilder()
                     .setNameFormat("scheduled-source-timeout-%d")
@@ -267,6 +282,32 @@ public class AgentServiceImpl implements AgentService {
         return 
TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
     }
 
+    @Override
+    public TaskResult getExistTaskConfig(TaskRequest request) {
+        LOGGER.debug("begin to get all exist task by request={}", request);
+        // Query pending special commands
+        List<DataConfig> runningTaskConfig = Lists.newArrayList();
+        List<StreamSourceEntity> sourceEntities = taskCache.get(request);
+        try {
+            List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+            if (CollectionUtils.isEmpty(sourceEntities)) {
+                return 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+            }
+            for (StreamSourceEntity sourceEntity : sourceEntities) {
+                int op = getOp(sourceEntity.getStatus());
+                DataConfig dataConfig = getDataConfig(sourceEntity, op);
+                runningTaskConfig.add(dataConfig);
+            }
+            TaskResult taskResult = 
TaskResult.builder().dataConfigs(runningTaskConfig).cmdConfigs(cmdConfigs).build();
+
+            return taskResult;
+        } catch (Exception e) {
+            LOGGER.error("get all exist task failed:", e);
+            throw new BusinessException("get all exist task failed:" + 
e.getMessage());
+        }
+
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
     public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
@@ -552,6 +593,8 @@ public class AgentServiceImpl implements AgentService {
         InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
         String extParams = entity.getExtParams();
         if (groupEntity != null && streamEntity != null) {
+            dataConfig.setState(
+                    
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
 ? 1 : 0);
             dataConfig.setSyncSend(streamEntity.getSyncSend());
             if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
                 String dataSeparator = streamEntity.getDataSeparator();
@@ -683,4 +726,20 @@ public class AgentServiceImpl implements AgentService {
         return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
+    private List<StreamSourceEntity> fetchTask(TaskRequest request) {
+        final String clusterName = request.getClusterName();
+        final String ip = request.getAgentIp();
+        final String uuid = request.getUuid();
+        List<StreamSourceEntity> normalSourceEntities = 
sourceMapper.selectByStatusAndCluster(
+                
SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
+                clusterName, ip, uuid);
+        List<StreamSourceEntity> taskLists = new 
ArrayList<>(normalSourceEntities);
+        List<StreamSourceEntity> stopSourceEntities = 
sourceMapper.selectByStatusAndCluster(
+                
SourceStatus.STOP_STATUS_SET.stream().map(SourceStatus::getCode).collect(Collectors.toList()),
+                clusterName, ip, uuid);
+        taskLists.addAll(stopSourceEntities);
+        LOGGER.debug("success to add task : {}", taskLists.size());
+        return taskLists;
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index cded7e3962..77011bd2e1 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -70,6 +70,12 @@ public class AgentController {
         return Response.success(agentService.getTaskResult(request));
     }
 
+    @PostMapping("/agent/getExistTaskConfig")
+    @ApiOperation(value = "Get all exist task config")
+    public Response<TaskResult> getExistTaskConfig(@RequestBody TaskRequest 
request) {
+        return Response.success(agentService.getExistTaskConfig(request));
+    }
+
     @PostMapping("/agent/bindGroup")
     @ApiOperation(value = "Divide the agent into different groups, which 
collect different stream source tasks.")
     public Response<Boolean> bindGroup(@RequestBody 
AgentClusterNodeBindGroupRequest request) {

Reply via email to