This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f1395eb0d8 [INLONG-9297][Manager] Support configuring multiple sink
types of tasks under a single stream (#9298)
f1395eb0d8 is described below
commit f1395eb0d8bd9f6b4d5f4fd3fa322422f2c27aaa
Author: fuweng11 <[email protected]>
AuthorDate: Tue Nov 28 20:11:20 2023 +0800
[INLONG-9297][Manager] Support configuring multiple sink types of tasks
under a single stream (#9298)
---
.../inlong/manager/common/consts/SinkType.java | 61 +++++++-
.../consts/{StreamType.java => SortType.java} | 13 +-
.../inlong/manager/common/consts/StreamType.java | 13 ++
.../{StreamType.java => SupportSortType.java} | 23 +--
.../inlong/manager/common/enums/GroupStatus.java | 3 +-
.../plugin/listener/DeleteSortListener.java | 90 ++++++-----
.../plugin/listener/RestartSortListener.java | 123 ++++++++-------
.../plugin/listener/StartupSortListener.java | 125 ++++++++-------
.../plugin/listener/StartupStreamListener.java | 123 ++++++++++++++-
.../plugin/listener/SuspendSortListener.java | 93 ++++++-----
.../manager/plugin/poller/SortStatusPoller.java | 31 ++--
.../inlong/manager/pojo/sort/SortStatusInfo.java | 3 +
.../manager/service/core/impl/SortServiceImpl.java | 12 +-
.../service/group/InlongGroupServiceImpl.java | 12 +-
.../service/listener/sort/SortConfigListener.java | 17 +-
.../listener/sort/StreamSortConfigListener.java | 31 ++--
.../resource/sort/DefaultSortConfigOperator.java | 173 +++++++++++----------
.../service/resource/sort/SortConfigOperator.java | 8 +-
.../resource/sort/SortConfigOperatorFactory.java | 13 +-
.../resources/application-unit-test.properties | 2 +
.../manager/workflow/plugin/sort/SortPoller.java | 6 +-
21 files changed, 637 insertions(+), 338 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index f0b215ac30..1d53c71fc2 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -17,26 +17,85 @@
package org.apache.inlong.manager.common.consts;
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
/**
* Constants of sink type.
*/
public class SinkType extends StreamType {
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String HIVE = "HIVE";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String CLICKHOUSE = "CLICKHOUSE";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String HBASE = "HBASE";
- public static final String ELASTICSEARCH = "ES";
+
+ @SupportSortType(sortType = SortType.SORT_STANDALONE)
+ public static final String ELASTICSEARCH = "ELASTICSEARCH";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String HDFS = "HDFS";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String GREENPLUM = "GREENPLUM";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String MYSQL = "MYSQL";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String DORIS = "DORIS";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String STARROCKS = "STARROCKS";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String KUDU = "KUDU";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String REDIS = "REDIS";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
+ public static final String TUBEMQ = "TUBEMQ";
+
/**
* Tencent cloud log service
* Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
*/
+ @SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String CLS = "CLS";
+
+ public static final Set<String> SORT_FLINK_SINK = new HashSet<>();
+
+ public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();
+
+ static {
+ SinkType obj = new SinkType();
+ Class<? extends SinkType> clazz = obj.getClass();
+ Field[] fields = clazz.getFields();
+ for (Field field : fields) {
+ if (field.isAnnotationPresent(SupportSortType.class)) {
+ SupportSortType annotation =
field.getAnnotation(SupportSortType.class);
+ if (Objects.equals(annotation.sortType(),
SortType.SORT_STANDALONE)) {
+ SORT_STANDALONE_SINK.add(field.getName());
+ } else {
+ SORT_FLINK_SINK.add(field.getName());
+ }
+ }
+ }
+ }
+
+ public static boolean containSortFlinkSink(List<String> sinkTypes) {
+ return sinkTypes.stream().anyMatch(SORT_STANDALONE_SINK::contains);
+ }
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
similarity index 63%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
copy to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
index afbd57bc50..e40c2922e8 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SortType.java
@@ -18,16 +18,11 @@
package org.apache.inlong.manager.common.consts;
/**
- * Constant for stream types, indicating that both StreamSource and StreamSink
support these types.
+ * Sort task type, including sort flink and sort standalone
*/
-public class StreamType {
+public enum SortType {
- public static final String KAFKA = "KAFKA";
- public static final String HUDI = "HUDI";
- public static final String POSTGRESQL = "POSTGRESQL";
- public static final String SQLSERVER = "SQLSERVER";
- public static final String ORACLE = "ORACLE";
- public static final String PULSAR = "PULSAR";
- public static final String ICEBERG = "ICEBERG";
+ SORT_FLINK,
+ SORT_STANDALONE
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index afbd57bc50..f8f70dfe19 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -22,12 +22,25 @@ package org.apache.inlong.manager.common.consts;
*/
public class StreamType {
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String KAFKA = "KAFKA";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String HUDI = "HUDI";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String POSTGRESQL = "POSTGRESQL";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String SQLSERVER = "SQLSERVER";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String ORACLE = "ORACLE";
+
+ @SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String PULSAR = "PULSAR";
+
+ @SupportSortType(sortType = SortType.SORT_FLINK)
public static final String ICEBERG = "ICEBERG";
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
similarity index 58%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
copy to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
index afbd57bc50..00c9048aac 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SupportSortType.java
@@ -17,17 +17,22 @@
package org.apache.inlong.manager.common.consts;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
/**
- * Constant for stream types, indicating that both StreamSource and StreamSink
support these types.
+ * This annotation is used to indicate the type of inbound task used for
inbound operations, including sort flink and
+ * sort standalone. On the user's SinkType class field, this annotation is
used to identify which type of sort task each
+ * SinkType uses.
*/
-public class StreamType {
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Target({ElementType.FIELD})
+public @interface SupportSortType {
- public static final String KAFKA = "KAFKA";
- public static final String HUDI = "HUDI";
- public static final String POSTGRESQL = "POSTGRESQL";
- public static final String SQLSERVER = "SQLSERVER";
- public static final String ORACLE = "ORACLE";
- public static final String PULSAR = "PULSAR";
- public static final String ICEBERG = "ICEBERG";
+ SortType sortType();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index d0bb62f7bf..79e89ad53b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -75,7 +75,8 @@ public enum GroupStatus {
GROUP_STATE_AUTOMATON.put(CONFIGURATION_OFFLINE,
Sets.newHashSet(CONFIGURATION_OFFLINE, CONFIG_ONLINE_ING,
CONFIG_DELETING));
- GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING,
Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED));
+ GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING,
+ Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED,
CONFIG_SUCCESSFUL));
GROUP_STATE_AUTOMATON.put(CONFIG_DELETING,
Sets.newHashSet(CONFIG_DELETING, CONFIG_DELETED, CONFIG_FAILED));
GROUP_STATE_AUTOMATON.put(CONFIG_DELETED,
Sets.newHashSet(CONFIG_DELETED));
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index d7f33fd77d..a7918c640a 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -34,6 +35,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
@@ -84,46 +86,54 @@ public class DeleteSortListener implements
SortOperateListener {
}
GroupResourceProcessForm groupResourceProcessForm =
(GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo =
groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = new HashMap<>();
- extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(),
groupExtInfo.getKeyValue()));
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for
groupId=%s", groupId);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- flinkOperation.delete(flinkInfo);
- log.info("job delete success for jobId={}", jobId);
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ List<InlongStreamInfo> streamInfos =
groupResourceProcessForm.getStreamInfos();
+ for (InlongStreamInfo streamInfo : streamInfos) {
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ if (CollectionUtils.isEmpty(sinkList)) {
+ continue;
+ }
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.info("stream sink ext info: {}", extList);
+
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for
groupId=%s, streamId=%s", groupId,
+ streamInfo.getInlongStreamId());
+ return ListenerResult.fail(message);
+ }
+
+ FlinkInfo flinkInfo = new FlinkInfo();
+ flinkInfo.setJobId(jobId);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
+ FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ try {
+ flinkOperation.delete(flinkInfo);
+ log.info("job delete success for jobId={}", jobId);
+ } catch (Exception e) {
+ flinkInfo.setException(true);
+ flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+
+ String message = String.format("delete sort failed for
groupId=%s, streamId=%s", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message, e);
+ return ListenerResult.fail(message + ": " + e.getMessage());
+ }
flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
-
- String message = String.format("delete sort failed for
groupId=%s", groupId);
- log.error(message, e);
- return ListenerResult.fail(message + ": " + e.getMessage());
}
- flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index c8a223e0af..66bf88b149 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -25,8 +25,9 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -35,6 +36,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
@@ -85,66 +87,79 @@ public class RestartSortListener implements
SortOperateListener {
}
GroupResourceProcessForm groupResourceProcessForm =
(GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo =
groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = new HashMap<>();
- extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(),
groupExtInfo.getKeyValue()));
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId
[%s]", groupId);
- return ListenerResult.fail(message);
- }
- String dataflow = kvConf.get(InlongConstants.DATAFLOW);
- if (StringUtils.isEmpty(dataflow)) {
- String message = String.format("dataflow is empty for groupId
[%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
- FlinkInfo flinkInfo = new FlinkInfo();
- String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
- flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- flinkOperation.genPath(flinkInfo, dataflow);
- // todo Currently, savepoint is not being used to restart, but
will be improved in the future
- flinkOperation.start(flinkInfo);
- log.info("job restart success for [{}]", jobId);
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ List<InlongStreamInfo> streamInfos =
groupResourceProcessForm.getStreamInfos();
+ for (InlongStreamInfo streamInfo : streamInfos) {
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ if (CollectionUtils.isEmpty(sinkList)) {
+ continue;
+ }
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.info("stream ext info: {}", extList);
+
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for
groupId [%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ return ListenerResult.fail(message);
+ }
+ String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+ if (StringUtils.isEmpty(dataflow)) {
+ String message = String.format("dataflow is empty for groupId
[%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ FlinkInfo flinkInfo = new FlinkInfo();
+ String jobName =
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) +
streamInfo.getInlongStreamId();
+ flinkInfo.setJobName(jobName);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
+ FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ try {
+ flinkOperation.genPath(flinkInfo, dataflow);
+ // todo Currently, savepoint is not being used to restart, but
will be improved in the future
+ flinkOperation.start(flinkInfo);
+ log.info("job restart success for groupId = {}, streamId = {}
jobId = {}", groupId,
+ streamInfo.getInlongStreamId(), jobId);
+ } catch (Exception e) {
+ flinkInfo.setException(true);
+ flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+ String message = String.format("restart sort failed for
groupId [%s], streamId [%s] ", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
+ }
+ extList.forEach(groupExtInfo ->
kvConf.remove(InlongConstants.SORT_JOB_ID));
+ saveInfo(streamInfo, InlongConstants.SORT_JOB_ID,
flinkInfo.getJobId(), extList);
flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
-
- String message = String.format("restart sort failed for groupId
[%s] ", groupId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
}
- extList.forEach(groupExtInfo ->
kvConf.remove(InlongConstants.SORT_JOB_ID));
- saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(),
extList);
- flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
return ListenerResult.success();
}
/**
- * Save ext info into list.
+ * Save stream ext info into list.
*/
- private void saveInfo(String inlongGroupId, String keyName, String
keyValue, List<InlongGroupExtInfo> extInfoList) {
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
- extInfo.setInlongGroupId(inlongGroupId);
+ private void saveInfo(InlongStreamInfo streamInfo, String keyName, String
keyValue,
+ List<InlongStreamExtInfo> extInfoList) {
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+ extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+ extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
extInfo.setKeyName(keyName);
extInfo.setKeyValue(keyValue);
extInfoList.add(extInfo);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index b65fb40ea6..0dc3cf08f3 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.plugin.listener;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -25,8 +26,8 @@ import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -36,9 +37,11 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -71,7 +74,7 @@ public class StartupSortListener implements
SortOperateListener {
}
log.info("add startup group listener for groupId [{}]", groupId);
- return true;
+ return
InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
}
@Override
@@ -94,65 +97,77 @@ public class StartupSortListener implements
SortOperateListener {
return ListenerResult.success();
}
- InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = extList.stream().filter(v ->
StringUtils.isNotEmpty(v.getKeyName())
- &&
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
- InlongGroupExtInfo::getKeyName,
- InlongGroupExtInfo::getKeyValue));
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String dataflow = kvConf.get(InlongConstants.DATAFLOW);
- if (StringUtils.isEmpty(dataflow)) {
- String message = String.format("dataflow is empty for groupId
[%s]", groupId);
- log.error(message);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
-
- String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
- flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
- flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
-
- try {
- flinkOperation.genPath(flinkInfo, dataflow);
- flinkOperation.start(flinkInfo);
- log.info("job submit success, jobId is [{}]",
flinkInfo.getJobId());
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ for (InlongStreamInfo streamInfo : streamInfos) {
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ List<String> sinkTypes =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(sinkList) ||
!SinkType.containSortFlinkSink(sinkTypes)) {
+ return ListenerResult.success();
+ }
+
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.info("stream ext info: {}", extList);
+ Map<String, String> kvConf = extList.stream().filter(v ->
StringUtils.isNotEmpty(v.getKeyName())
+ &&
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
+ InlongStreamExtInfo::getKeyName,
+ InlongStreamExtInfo::getKeyValue));
+
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+ if (StringUtils.isEmpty(dataflow)) {
+ String message = String.format("dataflow is empty for groupId
[%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ FlinkInfo flinkInfo = new FlinkInfo();
+
+ String jobName =
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) +
streamInfo.getInlongStreamId();
+ flinkInfo.setJobName(jobName);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+
+ FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+
+ try {
+ flinkOperation.genPath(flinkInfo, dataflow);
+ flinkOperation.start(flinkInfo);
+ log.info("job submit success for groupId = {}, streamId = {},
jobId = {}", groupId,
+ streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+ } catch (Exception e) {
+ flinkInfo.setException(true);
+ flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+ String message = String.format("startup sort failed for
groupId [%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
+ }
+
+ saveInfo(streamInfo, InlongConstants.SORT_JOB_ID,
flinkInfo.getJobId(), extList);
flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
-
- String message = String.format("startup sort failed for groupId
[%s] ", groupId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
}
-
- saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(),
extList);
- flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
return ListenerResult.success();
}
/**
- * Save ext info into list.
+ * Save stream ext info into list.
*/
- private void saveInfo(String inlongGroupId, String keyName, String
keyValue, List<InlongGroupExtInfo> extInfoList) {
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
- extInfo.setInlongGroupId(inlongGroupId);
+ private void saveInfo(InlongStreamInfo streamInfo, String keyName, String
keyValue,
+ List<InlongStreamExtInfo> extInfoList) {
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+ extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+ extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
extInfo.setKeyName(keyName);
extInfo.setKeyValue(keyValue);
extInfoList.add(extInfo);
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 1f2a8fabef..b3a341c37d 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -17,12 +17,36 @@
package org.apache.inlong.manager.plugin.listener;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
+import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
/**
* Listener for startup the Sort task for InlongStream
@@ -41,13 +65,108 @@ public class StartupStreamListener implements
SortOperateListener {
*/
@Override
public boolean accept(WorkflowContext workflowContext) {
- log.info("not need to start the sort task for InlongStream");
- return false;
+ ProcessForm processForm = workflowContext.getProcessForm();
+ String groupId = processForm.getInlongGroupId();
+ if (!(processForm instanceof StreamResourceProcessForm)) {
+ log.info("not add startup stream listener, not
StreamResourceProcessForm for groupId [{}]", groupId);
+ return false;
+ }
+
+ StreamResourceProcessForm streamProcessForm =
(StreamResourceProcessForm) processForm;
+ String streamId =
streamProcessForm.getStreamInfo().getInlongStreamId();
+ if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+ log.info("not add startup stream listener, as the operate was not
INIT for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
+
+ log.info("add startup stream listener for groupId [{}] streamId [{}]",
groupId, streamId);
+ return
InlongConstants.STANDARD_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
+ ProcessForm processForm = context.getProcessForm();
+ StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
+ InlongStreamInfo streamInfo =
streamResourceProcessForm.getStreamInfo();
+ List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+ log.info("inlong stream :{} ext info: {}",
streamInfo.getInlongStreamId(), streamExtList);
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ List<String> sinkTypes =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(sinkList) ||
!SinkType.containSortFlinkSink(sinkTypes)) {
+ log.warn("not any sink configured for group {} and stream {}, skip
launching sort job", groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.info("stream ext info: {}", extList);
+ Map<String, String> kvConf = extList.stream().filter(v ->
StringUtils.isNotEmpty(v.getKeyName())
+ &&
StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
+ InlongStreamExtInfo::getKeyName,
+ InlongStreamExtInfo::getKeyValue));
+
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String dataflow = kvConf.get(InlongConstants.DATAFLOW);
+ if (StringUtils.isEmpty(dataflow)) {
+ String message = String.format("dataflow is empty for groupId
[%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message);
+ return ListenerResult.fail(message);
+ }
+
+ FlinkInfo flinkInfo = new FlinkInfo();
+
+ String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm)
+ streamInfo.getInlongStreamId();
+ flinkInfo.setJobName(jobName);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+
+ FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+
+ try {
+ flinkOperation.genPath(flinkInfo, dataflow);
+ flinkOperation.start(flinkInfo);
+ log.info("job submit success for groupId = {}, streamId = {},
jobId = {}", groupId,
+ streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+ } catch (Exception e) {
+ flinkInfo.setException(true);
+ flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
+
+ String message = String.format("startup sort failed for groupId
[%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
+ }
+
+ saveInfo(streamInfo, InlongConstants.SORT_JOB_ID,
flinkInfo.getJobId(), extList);
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
return ListenerResult.success();
}
+ /**
+ * Save stream ext info into list.
+ */
+ private void saveInfo(InlongStreamInfo streamInfo, String keyName, String
keyValue,
+ List<InlongStreamExtInfo> extInfoList) {
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+ extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+ extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
+ extInfo.setKeyName(keyName);
+ extInfo.setKeyValue(keyValue);
+ extInfoList.add(extInfo);
+ }
+
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index affe9bbd22..06a76e1bf7 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -24,8 +24,9 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -34,6 +35,7 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
@@ -84,47 +86,56 @@ public class SuspendSortListener implements
SortOperateListener {
}
GroupResourceProcessForm groupResourceProcessForm =
(GroupResourceProcessForm) processForm;
- InlongGroupInfo inlongGroupInfo =
groupResourceProcessForm.getGroupInfo();
- List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
- log.info("inlong group ext info: {}", extList);
-
- Map<String, String> kvConf = new HashMap<>();
- extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(),
groupExtInfo.getKeyValue()));
- String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isNotEmpty(sortExt)) {
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
- }
-
- String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- if (StringUtils.isBlank(jobId)) {
- String message = String.format("sort job id is empty for groupId
[%s]", groupId);
- return ListenerResult.fail(message);
- }
-
- FlinkInfo flinkInfo = new FlinkInfo();
- flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongConstants.SORT_URL);
- flinkInfo.setEndpoint(sortUrl);
-
- FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
- FlinkOperation flinkOperation = new FlinkOperation(flinkService);
- try {
- // todo Currently, savepoint is not being used to stop, but will
be improved in the future
- flinkOperation.delete(flinkInfo);
- log.info("job suspend success for [{}]", jobId);
- } catch (Exception e) {
- flinkInfo.setException(true);
- flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ List<InlongStreamInfo> streamInfos =
groupResourceProcessForm.getStreamInfos();
+ for (InlongStreamInfo streamInfo : streamInfos) {
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ if (CollectionUtils.isEmpty(sinkList)) {
+ continue;
+ }
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.info("stream ext info: {}", extList);
+
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(v -> kvConf.put(v.getKeyName(), v.getKeyValue()));
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new
TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ String message = String.format("sort job id is empty for
groupId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ return ListenerResult.fail(message);
+ }
+
+ FlinkInfo flinkInfo = new FlinkInfo();
+ flinkInfo.setJobId(jobId);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ flinkInfo.setEndpoint(sortUrl);
+
+ FlinkService flinkService = new
FlinkService(flinkInfo.getEndpoint());
+ FlinkOperation flinkOperation = new FlinkOperation(flinkService);
+ try {
+ // todo Currently, savepoint is not being used to stop, but
will be improved in the future
+ flinkOperation.delete(flinkInfo);
+ log.info("job suspend success for groupId = {}, streamId ={},
jobId = {}", groupId,
+ streamInfo.getInlongStreamId(), jobId);
+ } catch (Exception e) {
+ flinkInfo.setException(true);
+ flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
+ flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
+
+ String message = String.format("suspend sort failed for
groupId [%s], streamId [%s]", groupId,
+ streamInfo.getInlongStreamId());
+ log.error(message, e);
+ return ListenerResult.fail(message + e.getMessage());
+ }
flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
-
- String message = String.format("suspend sort failed for groupId
[%s] ", groupId);
- log.error(message, e);
- return ListenerResult.fail(message + e.getMessage());
}
- flinkOperation.pollJobStatus(flinkInfo, JobStatus.CANCELED);
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 0f689c0e3a..84509b8a0e 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -21,9 +21,9 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -66,22 +66,21 @@ public class SortStatusPoller implements SortPoller {
}
@Override
- public List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo>
groupInfos, String credentials) {
- log.debug("begin to poll sort status for inlong groups");
- if (CollectionUtils.isEmpty(groupInfos)) {
- log.debug("end to poll sort status, as the inlong groups is
empty");
+ public List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo>
streamInfos, String credentials) {
+ log.debug("begin to poll sort status for stream");
+ if (CollectionUtils.isEmpty(streamInfos)) {
+ log.debug("end to poll sort status, as the stream list is empty");
return Collections.emptyList();
}
- List<SortStatusInfo> statusInfos = new ArrayList<>(groupInfos.size());
- for (InlongGroupInfo groupInfo : groupInfos) {
- String groupId = groupInfo.getInlongGroupId();
+ List<SortStatusInfo> statusInfos = new ArrayList<>(streamInfos.size());
+ for (InlongStreamInfo streamInfo : streamInfos) {
try {
- List<InlongGroupExtInfo> extList = groupInfo.getExtList();
- log.debug("inlong group {} ext info: {}", groupId, extList);
+ List<InlongStreamExtInfo> extList = streamInfo.getExtList();
+ log.debug("stream {} ext info: {}",
streamInfo.getInlongStreamId(), extList);
Map<String, String> kvConf = new HashMap<>();
- extList.forEach(groupExtInfo ->
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
+ extList.forEach(v -> kvConf.put(v.getKeyName(),
v.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
Map<String, String> result =
JsonUtils.OBJECT_MAPPER.convertValue(
@@ -91,7 +90,10 @@ public class SortStatusPoller implements SortPoller {
}
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
- SortStatusInfo statusInfo =
SortStatusInfo.builder().inlongGroupId(groupId).build();
+ SortStatusInfo statusInfo = SortStatusInfo.builder()
+ .inlongGroupId(streamInfo.getInlongGroupId())
+ .inlongStreamId(streamInfo.getInlongStreamId())
+ .build();
if (StringUtils.isBlank(jobId)) {
statusInfo.setSortStatus(SortStatus.NOT_EXISTS);
statusInfos.add(statusInfo);
@@ -104,7 +106,8 @@ public class SortStatusPoller implements SortPoller {
JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId),
SortStatus.UNKNOWN));
statusInfos.add(statusInfo);
} catch (Exception e) {
- log.error("polling sort status failed for groupId=" + groupId,
e);
+ log.error("polling sort status failed for groupId=" +
streamInfo.getInlongGroupId() + " streamId="
+ + streamInfo.getInlongStreamId(), e);
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
index 6fd04ba855..7950c9cc1c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
@@ -41,6 +41,9 @@ public class SortStatusInfo {
@ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
+ @ApiModelProperty(value = "Inlong stream id")
+ private String inlongStreamId;
+
@ApiModelProperty(value = "Sort status info")
private SortStatus sortStatus;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 437e80c035..0072a877f1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -26,10 +26,12 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortService;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
@@ -38,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -58,6 +61,8 @@ public class SortServiceImpl implements SortService,
PluginBinder {
private SortClusterService sortClusterService;
@Autowired
private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
/**
* The plugin poller will be initialed after the application starts.
@@ -92,8 +97,11 @@ public class SortServiceImpl implements SortService,
PluginBinder {
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
-
- List<SortStatusInfo> statusInfos =
sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+ List<InlongStreamInfo> streamInfos = new ArrayList<>();
+ groupInfoList.forEach(groupInfo -> {
+
streamInfos.addAll(streamService.list(groupInfo.getInlongGroupId()));
+ });
+ List<SortStatusInfo> statusInfos =
sortPoller.pollSortStatus(streamInfos, request.getCredentials());
log.debug("success to list sort status for request={}, result={}",
request, statusInfos);
return statusInfos;
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 4e994f8be1..62f9ee7da3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -29,9 +29,11 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -107,6 +109,8 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
@Autowired
private StreamSourceEntityMapper streamSourceMapper;
@Autowired
+ private InlongStreamExtEntityMapper streamExtMapper;
+ @Autowired
private InlongClusterService clusterService;
@Autowired
@@ -211,7 +215,8 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
List<InlongGroupExtEntity> extEntityList =
groupExtMapper.selectByGroupId(groupId);
List<InlongGroupExtInfo> extList =
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
groupInfo.setExtList(extList);
- BaseSortConf sortConf = buildSortConfig(extList);
+ List<InlongStreamExtEntity> streamExtEntities =
streamExtMapper.selectByRelatedId(groupId, null);
+ BaseSortConf sortConf = buildSortConfig(streamExtEntities);
groupInfo.setSortConf(sortConf);
LOGGER.debug("success to get inlong group for groupId={}", groupId);
@@ -232,7 +237,8 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
List<InlongGroupExtEntity> extEntityList =
groupExtMapper.selectByGroupId(groupId);
List<InlongGroupExtInfo> extList =
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
groupInfo.setExtList(extList);
- BaseSortConf sortConf = buildSortConfig(extList);
+ List<InlongStreamExtEntity> streamExtEntities =
streamExtMapper.selectByRelatedId(groupId, null);
+ BaseSortConf sortConf = buildSortConfig(streamExtEntities);
groupInfo.setSortConf(sortConf);
return groupInfo;
}
@@ -595,7 +601,7 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
return true;
}
- private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
+ private BaseSortConf buildSortConfig(List<InlongStreamExtEntity> extInfos)
{
Map<String, String> extMap = new HashMap<>();
extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(),
extInfo.getKeyValue()));
String type = extMap.get(InlongConstants.SORT_TYPE);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index b8d776c6e7..ebed9e72e3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -17,11 +17,13 @@
package org.apache.inlong.manager.service.listener.sort;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -41,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Event listener of build the Sort config,
@@ -121,8 +124,18 @@ public class SortConfigListener implements
SortOperateListener {
}
try {
- SortConfigOperator operator =
operatorFactory.getInstance(groupInfo.getEnableZookeeper());
- operator.buildConfig(groupInfo, streamInfos, false);
+ for (InlongStreamInfo streamInfo : streamInfos) {
+ List<StreamSink> sinkList = streamInfo.getSinkList();
+ if (CollectionUtils.isEmpty(sinkList)) {
+ continue;
+ }
+ List<String> sinkTypeList =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+ List<SortConfigOperator> operatorList =
operatorFactory.getInstance(sinkTypeList);
+ for (SortConfigOperator operator : operatorList) {
+ operator.buildConfig(groupInfo, streamInfo,
+
InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode()));
+ }
+ }
} catch (Exception e) {
String msg = String.format("failed to build sort config for
groupId=%s, ", groupId);
LOGGER.error(msg + "streamInfos=" + streamInfos, e);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 39ac951535..070f71c5a6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.sort;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.TaskEvent;
@@ -41,8 +42,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Event listener of build the Sort config for one inlong stream,
@@ -85,6 +86,17 @@ public class StreamSortConfigListener implements
SortOperateListener {
InlongStreamInfo streamInfo = form.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
+ // Read the current information
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ if (groupInfo == null) {
+ String msg = "inlong group not found with groupId=" + groupId;
+ LOGGER.error(msg);
+ throw new WorkflowListenerException(msg);
+ }
+ form.setGroupInfo(groupInfo);
+ form.setStreamInfo(streamService.get(groupId, streamId));
+ groupInfo = form.getGroupInfo();
+ streamInfo = form.getStreamInfo();
LOGGER.info("begin to build sort config for groupId={}, streamId={}",
groupId, streamId);
GroupOperateType operateType = form.getGroupOperateType();
@@ -94,7 +106,6 @@ public class StreamSortConfigListener implements
SortOperateListener {
return ListenerResult.success();
}
- InlongGroupInfo groupInfo = groupService.get(groupId);
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
Preconditions.expectTrue(GroupStatus.CONFIG_FAILED != groupStatus,
String.format("group status=%s not support start stream for
groupId=%s", groupStatus, groupId));
@@ -103,17 +114,19 @@ public class StreamSortConfigListener implements
SortOperateListener {
LOGGER.warn("not build sort config for groupId={}, streamId={}, as
not found any sinks", groupId, streamId);
return ListenerResult.success();
}
- // Read the current information
- form.setGroupInfo(groupInfo);
- form.setStreamInfo(streamService.get(groupId, streamId));
- List<InlongStreamInfo> streamInfos =
Collections.singletonList(streamInfo);
try {
- SortConfigOperator operator =
operatorFactory.getInstance(groupInfo.getEnableZookeeper());
- operator.buildConfig(groupInfo, streamInfos, true);
+
+ List<String> sinkTypeList =
streamSinks.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
+ List<SortConfigOperator> operatorList =
operatorFactory.getInstance(sinkTypeList);
+ for (SortConfigOperator operator : operatorList) {
+ operator.buildConfig(groupInfo, streamInfo,
+
InlongConstants.SYNC_SEND.equals(groupInfo.getInlongGroupMode()));
+ }
+
} catch (Exception e) {
String msg = String.format("failed to build sort config for
groupId=%s, streamId=%s, ", groupId, streamId);
- LOGGER.error(msg + "streamInfos=" + streamInfos, e);
+ LOGGER.error(msg + "streamInfo=" + streamInfo, e);
throw new WorkflowListenerException(msg + e.getMessage());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 11c4e3d6f6..08bc53cbf0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -18,13 +18,15 @@
package org.apache.inlong.manager.service.resource.sort;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -48,6 +50,7 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -78,111 +81,120 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private AuditService auditService;
@Override
- public Boolean accept(Integer enableZk) {
- return InlongConstants.DISABLE_ZK.equals(enableZk);
+ public Boolean accept(List<String> sinkTypeList) {
+ for (String sinkType : sinkTypeList) {
+ if (SinkType.SORT_FLINK_SINK.contains(sinkType)) {
+ return true;
+ }
+ }
+ return false;
}
@Override
- public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo>
streamInfos, boolean isStream)
+ public void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo
streamInfo, boolean isStream)
throws Exception {
if (isStream) {
LOGGER.warn("no need to build sort config for stream process when
disable zk");
return;
}
- if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
- LOGGER.warn("no need to build sort config as the group is null or
streams is empty when disable zk");
+ if (groupInfo == null || streamInfo == null) {
+ LOGGER.warn("no need to build sort config as the group is null or
stream is empty when disable zk");
return;
}
-
- GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfos);
+ List<StreamSink> sinkList = new ArrayList<>();
+ for (StreamSink sink : streamInfo.getSinkList()) {
+ if (SinkType.SORT_FLINK_SINK.contains(sink.getSinkType())) {
+ sinkList.add(sink);
+ }
+ }
+ if (CollectionUtils.isEmpty(sinkList)) {
+ return;
+ }
+ GroupInfo sortConfigInfo = this.getGroupInfo(groupInfo, streamInfo,
sinkList);
String dataflow = OBJECT_MAPPER.writeValueAsString(sortConfigInfo);
- this.addToGroupExt(groupInfo, dataflow);
+ this.addToStreamExt(streamInfo, dataflow);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("success to build sort config, isStream={},
dataflow={}", isStream, dataflow);
}
}
- private GroupInfo getGroupInfo(InlongGroupInfo groupInfo,
List<InlongStreamInfo> streamInfoList) {
+ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, InlongStreamInfo
inlongStreamInfo,
+ List<StreamSink> sinkInfos) {
+ String streamId = inlongStreamInfo.getInlongStreamId();
// get source info
- Map<String, List<StreamSource>> sourceMap =
sourceService.getSourcesMap(groupInfo, streamInfoList);
- // get sink info
- Map<String, List<StreamSink>> sinkMap =
sinkService.getSinksMap(groupInfo, streamInfoList);
- List<TransformResponse> transformList =
transformService.listTransform(groupInfo.getInlongGroupId(), null);
+ Map<String, List<StreamSource>> sourceMap =
sourceService.getSourcesMap(groupInfo,
+ Collections.singletonList(inlongStreamInfo));
+ List<TransformResponse> transformList =
transformService.listTransform(groupInfo.getInlongGroupId(), streamId);
Map<String, List<TransformResponse>> transformMap =
transformList.stream()
.collect(Collectors.groupingBy(TransformResponse::getInlongStreamId,
HashMap::new,
Collectors.toCollection(ArrayList::new)));
List<StreamInfo> sortStreamInfos = new ArrayList<>();
- for (InlongStreamInfo inlongStream : streamInfoList) {
- String streamId = inlongStream.getInlongStreamId();
- Map<String, StreamField> fieldMap = new HashMap<>();
- inlongStream.getSourceList().forEach(
- source -> parseConstantFieldMap(source.getSourceName(),
source.getFieldList(), fieldMap));
+ Map<String, StreamField> fieldMap = new HashMap<>();
+ inlongStreamInfo.getSourceList().forEach(
+ source -> parseConstantFieldMap(source.getSourceName(),
source.getFieldList(), fieldMap));
+
+ List<TransformResponse> transformResponses =
transformMap.get(streamId);
+ if (CollectionUtils.isNotEmpty(transformResponses)) {
+ transformResponses.forEach(
+ trans -> parseConstantFieldMap(trans.getTransformName(),
trans.getFieldList(), fieldMap));
+ }
- List<TransformResponse> transformResponses =
transformMap.get(streamId);
- if (CollectionUtils.isNotEmpty(transformResponses)) {
- transformResponses.forEach(
- trans ->
parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(),
fieldMap));
- }
+ // build a stream info from the nodes and relations
+ List<StreamSource> sources = sourceMap.get(streamId);
+ for (StreamSink sinkInfo : sinkInfos) {
+ CommonBeanUtils.copyProperties(inlongStreamInfo, sinkInfo, true);
+ addAuditId(sinkInfo.getProperties(), sinkInfo.getSinkType(), true);
+ }
- // build a stream info from the nodes and relations
- List<StreamSource> sources = sourceMap.get(streamId);
- List<StreamSink> sinks = sinkMap.get(streamId);
+ for (StreamSource source : sources) {
+ source.setFieldList(inlongStreamInfo.getFieldList());
+ }
+ List<NodeRelation> relations;
- for (StreamSink sink : sinks) {
- addAuditId(sink.getProperties(), sink.getSinkType(), true);
- }
- for (StreamSource source : sources) {
- source.setFieldList(inlongStream.getFieldList());
+ if
(InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if (CollectionUtils.isNotEmpty(transformResponses)) {
+ relations =
NodeRelationUtils.createNodeRelations(inlongStreamInfo);
+ // in standard mode(include Data Ingestion and
Synchronization), replace upstream source node and
+ // transform input fields node to MQ node (which is InLong
stream id)
+ String mqNodeName = sources.get(0).getSourceName();
+ Set<String> nodeNameSet = getInputNodeNames(sources,
transformResponses);
+ adjustTransformField(transformResponses, nodeNameSet,
mqNodeName);
+ adjustNodeRelations(relations, nodeNameSet, mqNodeName);
+ } else {
+ relations = NodeRelationUtils.createNodeRelations(sources,
sinkInfos);
}
- List<NodeRelation> relations;
-
- if
(InlongConstants.STANDARD_MODE.equals(groupInfo.getInlongGroupMode())) {
- if (CollectionUtils.isNotEmpty(transformResponses)) {
- relations =
NodeRelationUtils.createNodeRelations(inlongStream);
- // in standard mode(include Data Ingestion and
Synchronization), replace upstream source node and
- // transform input fields node to MQ node (which is InLong
stream id)
- String mqNodeName = sources.get(0).getSourceName();
- Set<String> nodeNameSet = getInputNodeNames(sources,
transformResponses);
- adjustTransformField(transformResponses, nodeNameSet,
mqNodeName);
- adjustNodeRelations(relations, nodeNameSet, mqNodeName);
- } else {
- relations = NodeRelationUtils.createNodeRelations(sources,
sinks);
- }
- if (sources.size() == sinks.size()) {
- for (int i = 0; i < sinks.size(); i++) {
- addAuditId(sources.get(i).getProperties(),
sinks.get(i).getSinkType(), false);
- }
- }
+ for (int i = 0; i < sources.size(); i++) {
+ addAuditId(sources.get(i).getProperties(),
sinkInfos.get(0).getSinkType(), false);
+ }
+ } else {
+ if (CollectionUtils.isNotEmpty(transformResponses)) {
+ List<String> sourcesNames =
sources.stream().map(StreamSource::getSourceName)
+ .collect(Collectors.toList());
+ List<String> transFormNames =
transformResponses.stream().map(TransformResponse::getTransformName)
+ .collect(Collectors.toList());
+ relations =
Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames,
transFormNames),
+ NodeRelationUtils.createNodeRelation(transFormNames,
+
sinkInfos.stream().map(StreamSink::getSinkName).collect(Collectors.toList())));
} else {
- if (CollectionUtils.isNotEmpty(transformResponses)) {
- List<String> sourcesNames =
sources.stream().map(StreamSource::getSourceName)
- .collect(Collectors.toList());
- List<String> transFormNames =
transformResponses.stream().map(TransformResponse::getTransformName)
- .collect(Collectors.toList());
- List<String> sinkNames =
sinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
- relations =
Arrays.asList(NodeRelationUtils.createNodeRelation(sourcesNames,
transFormNames),
-
NodeRelationUtils.createNodeRelation(transFormNames, sinkNames));
- } else {
- relations = NodeRelationUtils.createNodeRelations(sources,
sinks);
- }
+ relations = NodeRelationUtils.createNodeRelations(sources,
sinkInfos);
+ }
- for (StreamSource source : sources) {
- addAuditId(source.getProperties(), source.getSourceType(),
false);
- }
+ for (StreamSource source : sources) {
+ addAuditId(source.getProperties(), source.getSourceType(),
false);
}
+ }
- // create extract-transform-load nodes
- List<Node> nodes = this.createNodes(sources, transformResponses,
sinks, fieldMap);
+ // create extract-transform-load nodes
+ List<Node> nodes = this.createNodes(sources, transformResponses,
sinkInfos, fieldMap);
- StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
- sortStreamInfos.add(streamInfo);
+ StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
+ sortStreamInfos.add(streamInfo);
- // rebuild joinerNode relation
- NodeRelationUtils.optimizeNodeRelation(streamInfo,
transformResponses);
- }
+ // rebuild joinerNode relation
+ NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses);
return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
@@ -265,20 +277,21 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
}
/**
- * Add config into inlong group ext info
+ * Add config into inlong stream ext info
*/
- private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
- if (groupInfo.getExtList() == null) {
- groupInfo.setExtList(new ArrayList<>());
+ private void addToStreamExt(InlongStreamInfo streamInfo, String value) {
+ if (streamInfo.getExtList() == null) {
+ streamInfo.setExtList(new ArrayList<>());
}
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
- extInfo.setInlongGroupId(groupInfo.getInlongGroupId());
+ InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+ extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
+ extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
extInfo.setKeyName(InlongConstants.DATAFLOW);
extInfo.setKeyValue(value);
- groupInfo.getExtList().removeIf(ext ->
extInfo.getKeyName().equals(ext.getKeyName()));
- groupInfo.getExtList().add(extInfo);
+ streamInfo.getExtList().removeIf(ext ->
extInfo.getKeyName().equals(ext.getKeyName()));
+ streamInfo.getExtList().add(extInfo);
}
private void addAuditId(Map<String, Object> properties, String type,
boolean isSent) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
index ce0e31f589..bd4f3ee712 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperator.java
@@ -30,17 +30,17 @@ public interface SortConfigOperator {
/**
* Determines whether the current instance matches the specified type.
*
- * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0:
disable
+ * @param sinkTypeList sink type list
*/
- Boolean accept(Integer enableZk);
+ Boolean accept(List<String> sinkTypeList);
/**
* Build Sort config.
*
* @param groupInfo inlong group info
- * @param streamInfos inlong stream info list
+ * @param streamInfo inlong stream info
* @param isStream is the config built for inlong stream
*/
- void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo>
streamInfos, boolean isStream) throws Exception;
+ void buildConfig(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
boolean isStream) throws Exception;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
index 95a9c5c39f..baa9708f68 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortConfigOperatorFactory.java
@@ -17,12 +17,11 @@
package org.apache.inlong.manager.service.resource.sort;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Factory for {@link SortConfigOperator}.
@@ -36,14 +35,10 @@ public class SortConfigOperatorFactory {
/**
* Get a Sort config operator instance.
*
- * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0:
disable
+ * @param sinkTypeList sink type
*/
- public SortConfigOperator getInstance(Integer enableZk) {
- return operatorList.stream()
- .filter(inst -> inst.accept(enableZk))
- .findFirst()
- .orElseThrow(() -> new BusinessException("not found any
instance of SortConfigOperator when enableZk="
- + enableZk));
+ public List<SortConfigOperator> getInstance(List<String> sinkTypeList) {
+ return operatorList.stream().filter(inst ->
inst.accept(sinkTypeList)).collect(Collectors.toList());
}
}
diff --git
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index aa9155e735..17e8ccd2ae 100644
---
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -70,3 +70,5 @@ common.http-client.connectionRequestTimeout=3000
# tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=127.0.0.1
+
+
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
index 68dd242850..a602e0e979 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
@@ -17,8 +17,8 @@
package org.apache.inlong.manager.workflow.plugin.sort;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import java.util.List;
@@ -30,11 +30,11 @@ public interface SortPoller {
/**
* Poll the Sort status infos by the given inlong groups
*
- * @param groupInfos inlong group infos
+ * @param streamInfos stream sink infos
* @param credentials credential info
* @return list of Sort status infos
* @throws Exception any exception if occurred
*/
- List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos,
String credentials) throws Exception;
+ List<SortStatusInfo> pollSortStatus(List<InlongStreamInfo> streamInfos,
String credentials) throws Exception;
}