This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d51687fac [INLONG-5615][Manager] Add template id and sub task status
for file source (#5616)
d51687fac is described below
commit d51687facde131a0a0e96af9fff8951d7e9a1fc1
Author: woofyzhao <[email protected]>
AuthorDate: Mon Aug 22 19:09:34 2022 +0800
[INLONG-5615][Manager] Add template id and sub task status for file source
(#5616)
---
.../inlong/manager/client/cli/ListCommand.java | 2 +-
.../inlong/manager/client/cli/util/PrintUtils.java | 2 +-
.../manager/client/cli/validator/GroupStatus.java | 2 +-
.../inlong/manager/client/api/InlongClient.java | 4 +-
.../manager/client/api/InlongGroupContext.java | 4 +-
.../manager/client/api/impl/InlongClientImpl.java | 16 ++++--
.../manager/client/api/impl/InlongGroupImpl.java | 2 +-
.../client/api/inner/client/InlongGroupClient.java | 2 +-
.../manager/common}/enums/SimpleGroupStatus.java | 4 +-
.../manager/common}/enums/SimpleSourceStatus.java | 4 +-
.../manager/dao/entity/StreamSourceEntity.java | 1 +
.../dao/mapper/StreamSourceEntityMapper.java | 5 ++
.../resources/mappers/StreamSourceEntityMapper.xml | 19 +++++--
.../manager/pojo/group/InlongGroupStatusInfo.java | 53 ++++++++++++++++++
.../inlong/manager/pojo/source/StreamSource.java | 4 ++
.../inlong/manager/pojo/source/SubSourceDTO.java | 64 ++++++++++++++++++++++
.../service/core/impl/AgentServiceImpl.java | 4 +-
.../service/source/file/FileSourceOperator.java | 14 +++++
.../main/resources/h2/apache_inlong_manager.sql | 1 +
.../manager-web/sql/apache_inlong_manager.sql | 1 +
20 files changed, 183 insertions(+), 25 deletions(-)
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index 84e239fab..84a259a96 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
index 945abaa43..0627e732f 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
index 70b72aa78..9de4bb46a 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli.validator;
import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.ParameterException;
import org.apache.commons.lang3.EnumUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
/**
* Class for inlong group status verification.
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index e1c26e971..801b66523 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.client.api;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -30,6 +29,7 @@ import
org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
import java.util.List;
import java.util.Map;
@@ -101,7 +101,7 @@ public interface InlongClient {
* @return map of inlong group status list
* @throws Exception the exception
*/
- Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds)
throws Exception;
+ Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds)
throws Exception;
/**
* Gets group.
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 14fc38c76..c930d4846 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -23,8 +23,8 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 9ab335cb0..c373bfb11 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -29,8 +29,8 @@ import
org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongCluster;
import org.apache.inlong.manager.client.api.InlongGroup;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
@@ -47,6 +47,7 @@ import
org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -121,21 +122,26 @@ public class InlongClientImpl implements InlongClient {
}
@Override
- public Map<String, SimpleGroupStatus> listGroupStatus(List<String>
groupIds) {
+ public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String>
groupIds) {
InlongGroupPageRequest request = new InlongGroupPageRequest();
request.setGroupIdList(groupIds);
request.setListSources(true);
PageInfo<InlongGroupBriefInfo> pageInfo =
groupClient.listGroups(request);
List<InlongGroupBriefInfo> briefInfos = pageInfo.getList();
- Map<String, SimpleGroupStatus> groupStatusMap = Maps.newHashMap();
+ Map<String, InlongGroupStatusInfo> groupStatusMap = Maps.newHashMap();
if (CollectionUtils.isNotEmpty(briefInfos)) {
briefInfos.forEach(briefInfo -> {
String groupId = briefInfo.getInlongGroupId();
SimpleGroupStatus groupStatus =
SimpleGroupStatus.parseStatusByCode(briefInfo.getStatus());
List<StreamSource> sources = briefInfo.getStreamSources();
groupStatus = recheckGroupStatus(groupStatus, sources);
- groupStatusMap.put(groupId, groupStatus);
+ InlongGroupStatusInfo statusInfo =
InlongGroupStatusInfo.builder()
+ .inlongGroupId(briefInfo.getInlongGroupId())
+ .originalStatus(briefInfo.getStatus())
+ .simpleGroupStatus(groupStatus)
+ .streamSources(sources).build();
+ groupStatusMap.put(groupId, statusInfo);
});
}
return groupStatusMap;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 857680fff..403ff81b2 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index f5569ad50..148e9b8cc 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -21,7 +21,7 @@ import com.github.pagehelper.PageInfo;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.service.InlongGroupApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.pojo.common.Response;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
similarity index 97%
rename from
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
rename to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
index 63a017488..29edeec89 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api.enums;
-
-import org.apache.inlong.manager.common.enums.GroupStatus;
+package org.apache.inlong.manager.common.enums;
import java.util.ArrayList;
import java.util.List;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
similarity index 94%
rename from
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
rename to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
index 53ed23e36..175b3ce49 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api.enums;
-
-import org.apache.inlong.manager.common.enums.SourceStatus;
+package org.apache.inlong.manager.common.enums;
/**
* The simple stream source status, more readable for users
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 5a0047c36..2d7a1aa9d 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -34,6 +34,7 @@ public class StreamSourceEntity implements Serializable {
private String inlongStreamId;
private String sourceType;
private String sourceName;
+ private Integer templateId;
private String agentIp;
private String uuid;
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index a55c26f4d..eabd61220 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -93,6 +93,11 @@ public interface StreamSourceEntityMapper {
*/
List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList")
List<String> groupIdList);
+ /**
+ * Select all sub sources by template id
+ */
+ List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer
templateId);
+
/**
* Get the distinct source type from the given groupId and streamId
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6c5d36be9..32f8c56ef 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -26,6 +26,7 @@
<result column="inlong_stream_id" jdbcType="VARCHAR"
property="inlongStreamId"/>
<result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
<result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
+ <result column="template_id" jdbcType="INTEGER" property="templateId"/>
<result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
<result column="uuid" jdbcType="VARCHAR" property="uuid"/>
<result column="data_node_name" jdbcType="VARCHAR"
property="dataNodeName"/>
@@ -44,7 +45,7 @@
<result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, inlong_stream_id, source_type, source_name,
agent_ip, uuid,
+ id, inlong_group_id, inlong_stream_id, source_type, source_name,
template_id, agent_ip, uuid,
data_node_name, inlong_cluster_name, serialization_type, snapshot,
report_time, ext_params,
version, status, previous_status, is_deleted, creator, modifier,
create_time, modify_time
</sql>
@@ -52,14 +53,15 @@
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
insert into stream_source (inlong_group_id, inlong_stream_id,
- source_type, source_name, agent_ip,
+ source_type, source_name, template_id,
agent_ip,
uuid, data_node_name, inlong_cluster_name,
serialization_type, snapshot,
report_time, ext_params, status,
previous_status, creator, modifier)
values (#{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR},
- #{sourceType,jdbcType=VARCHAR},
#{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR},
- #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR},
#{inlongClusterName,jdbcType=VARCHAR},
+ #{sourceType,jdbcType=VARCHAR},
#{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
+ #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR},
+ #{dataNodeName,jdbcType=VARCHAR},
#{inlongClusterName,jdbcType=VARCHAR},
#{serializationType,jdbcType=VARCHAR},
#{snapshot,jdbcType=LONGVARCHAR},
#{modifyTime,jdbcType=TIMESTAMP},
#{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
@@ -72,6 +74,13 @@
where id = #{id,jdbcType=INTEGER}
and is_deleted = 0
</select>
+ <select id="selectByTemplateId"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ where template_id = #{templateId,jdbcType=INTEGER}
+ and is_deleted = 0
+ </select>
<select id="selectByIdForUpdate"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
@@ -125,6 +134,7 @@
<if test="request.status != null and request.status != ''">
and status = #{request.status, jdbcType=INTEGER}
</if>
+ and template_id is NULL
</where>
<choose>
<when test="request.orderField != null and request.orderField !=
'' and request.orderType != null and request.orderType != ''">
@@ -244,6 +254,7 @@
#{item}
</foreach>
</if>
+ and template_id is NULL
</where>
</select>
<select id="selectSourceType" resultType="java.lang.String">
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
new file mode 100644
index 000000000..c0cf4e678
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import java.util.List;
+
+/**
+ * Inlong group status info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong group status info")
+public class InlongGroupStatusInfo {
+
+ @ApiModelProperty(value = "Inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Inlong group original status")
+ private Integer originalStatus;
+
+ @ApiModelProperty(value = "Inlong group simple status")
+ private SimpleGroupStatus simpleGroupStatus;
+
+ @ApiModelProperty(value = "Stream sources in the inlong group")
+ private List<StreamSource> streamSources;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 00a6a9d28..9c07d7969 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.stream.StreamNode;
import java.util.Date;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -103,6 +104,9 @@ public abstract class StreamSource extends StreamNode {
@Builder.Default
private Map<String, Object> properties = new LinkedHashMap<>();
+ @ApiModelProperty("Sub source information of existing agents")
+ private List<SubSourceDTO> subSourceList;
+
public SourceRequest genSourceRequest() {
return null;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
new file mode 100644
index 000000000..372f0d71a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sub source information data per agent
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class SubSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("stream source id")
+ private Integer id;
+
+ @ApiModelProperty("Template source id this sub source belongs to")
+ private Integer templateId;
+
+ @ApiModelProperty("Agent ip of sub source")
+ private String agentIp;
+
+ @ApiModelProperty("Status of sub source")
+ private Integer status;
+
+ public static SubSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ return OBJECT_MAPPER.readValue(extParams, SubSourceDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3b31262b4..94ec64d61 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -237,7 +237,8 @@ public class AgentServiceImpl implements AgentService {
// Cluster name is not blank, split task if necessary
// The agent ip field of the entity holds the ip list of the
agents that has already been issued
- if (StringUtils.isNotBlank(destClusterName) &&
destClusterName.equals(agentClusterName)) {
+ if (StringUtils.isNotBlank(destClusterName) &&
destClusterName.equals(agentClusterName)
+ && Objects.isNull(sourceEntity.getTemplateId())) {
// Is the task already fetched by this agent ?
if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) {
@@ -256,6 +257,7 @@ public class AgentServiceImpl implements AgentService {
fileEntity.setAgentIp(agentIp);
fileEntity.setUuid(uuid);
fileEntity.setSourceName(fileEntity.getSourceName() + "-" +
RandomStringUtils.randomAlphanumeric(10));
+ fileEntity.setTemplateId(sourceEntity.getId());
int op = getOp(fileEntity.getStatus());
int nextStatus = getNextStatus(fileEntity.getStatus());
fileEntity.setStatus(nextStatus);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 0c2e5c23a..1eaa99740 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -23,17 +23,20 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.file.FileSource;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.pojo.source.SubSourceDTO;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.stream.Collectors;
/**
* File source operator, such as get or set file source info.
@@ -44,6 +47,9 @@ public class FileSourceOperator extends
AbstractSourceOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private StreamSourceEntityMapper sourceMapper;
+
@Override
public Boolean accept(String sourceType) {
return SourceType.FILE.equals(sourceType);
@@ -79,6 +85,14 @@ public class FileSourceOperator extends
AbstractSourceOperator {
List<StreamField> sourceFields = super.getSourceFields(entity.getId());
source.setFieldList(sourceFields);
+
+ List<StreamSourceEntity> subSourceList =
sourceMapper.selectByTemplateId(entity.getId());
+ source.setSubSourceList(subSourceList.stream().map(subEntity ->
SubSourceDTO.builder()
+ .id(subEntity.getId())
+ .templateId(entity.getId())
+ .agentIp(subEntity.getAgentIp())
+ .status(subEntity.getStatus()).build())
+ .collect(Collectors.toList()));
return source;
}
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 53662cf3f..2d7753dfb 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -381,6 +381,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT
'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source
type, including: FILE, DB, etc',
+ `template_id` int(11) DEFAULT NULL COMMENT 'Id of the
template task this agent belongs to',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of
the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid
of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node
name, which links to data_node table',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 1ae2850fa..0c249a2ff 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -402,6 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT
'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source
type, including: FILE, DB, etc',
+ `template_id` int(11) DEFAULT NULL COMMENT 'Id of the
template task this agent belongs to',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of
the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid
of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node
name, which links to data_node table',