This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7230ab677b [INLONG-10423][Manager] Modify unified configuration
related classes and interfaces (#10424)
7230ab677b is described below
commit 7230ab677b9f1c6b2b1d23ec67f6eba5d014b25b
Author: fuweng11 <[email protected]>
AuthorDate: Sat Jun 15 14:49:09 2024 +0800
[INLONG-10423][Manager] Modify unified configuration related classes and
interfaces (#10424)
---
.../apache/inlong/common/constant/SinkType.java | 2 ++
.../sort/dataflow/dataType/DataTypeConfig.java | 2 +-
.../deserialization/DeserializationConfig.java | 2 +-
.../common/pojo/sort/dataflow/sink/SinkConfig.java | 2 +-
.../common/pojo/sort/mq/MqClusterConfig.java | 2 +-
.../inlong/common/pojo/sort/node/NodeConfig.java | 2 +-
.../manager/common/consts/InlongConstants.java | 2 ++
.../inlong/manager/common/consts/StreamType.java | 2 +-
.../inlong/manager/pojo/sink/cls/ClsSink.java | 9 ++++++
.../inlong/manager/pojo/sink/kafka/KafkaSink.java | 5 +++
.../manager/pojo/sink/kafka/KafkaSinkRequest.java | 5 +++
.../pojo/sort/node/provider/DorisProvider.java | 1 -
.../manager/service/core/impl/SortServiceImpl.java | 17 ++++++++--
.../service/node/cls/ClsDataNodeOperator.java | 2 ++
.../node/es/ElasticsearchDataNodeOperator.java | 2 ++
.../service/node/kafka/KafkaDataNodeOperator.java | 2 ++
.../node/pulsar/PulsarDataNodeOperator.java | 2 ++
.../resource/sort/DefaultSortConfigOperator.java | 37 ++++++++++++++--------
.../manager/service/sink/AbstractSinkOperator.java | 6 ++--
.../manager/service/sink/StreamSinkOperator.java | 7 +++-
.../manager/service/sink/cls/ClsSinkOperator.java | 7 ++--
.../service/sink/es/ElasticsearchSinkOperator.java | 10 ++++--
.../service/sink/kafka/KafkaSinkOperator.java | 7 ++--
.../service/sink/pulsar/PulsarSinkOperator.java | 7 ++--
.../service/stream/InlongStreamServiceImpl.java | 1 -
.../web/controller/openapi/SortController.java | 2 +-
26 files changed, 109 insertions(+), 36 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
index 0dba17eb60..cd91b19464 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
@@ -23,4 +23,6 @@ public class SinkType {
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
+ public static final String STARROCKS = "STARROCKS";
+
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
index 3451e0abce..6f5f03fb48 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
@@ -24,7 +24,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import java.io.Serializable;
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = CsvConfig.class, name =
DeserializationType.CSV),
@JsonSubTypes.Type(value = KvConfig.class, name =
DeserializationType.KV),
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
index 1325e3930e..b854af7c90 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
@@ -24,7 +24,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import java.io.Serializable;
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name
= DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name
= DeserializationType.INLONG_MSG_PB),
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
index aa0d37ba3f..867d56dad6 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
import java.util.List;
@Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
@JsonSubTypes.Type(value = EsSinkConfig.class, name =
SinkType.ELASTICSEARCH),
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
index 599d2b661f..f39593689c 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
import java.util.List;
@Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = PulsarClusterConfig.class, name =
MQType.PULSAR),
@JsonSubTypes.Type(value = TubeClusterConfig.class, name =
MQType.TUBEMQ)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 49b7c8e0f3..ce0248e56d 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
import java.util.Map;
@Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = ClsNodeConfig.class, name =
DataNodeType.CLS),
@JsonSubTypes.Type(value = EsNodeConfig.class, name =
DataNodeType.ELASTICSEARCH),
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c3085972fd..820c22adcf 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -127,6 +127,8 @@ public class InlongConstants {
public static final String PULSAR_QUEUE_TYPE_PARALLEL = "PARALLEL";
+ public static final String DEFAULT_TASK = "DEFAULT_TASK";
+
/**
* Format of the Pulsar topic: "persistent://tenant/namespace/topic
*/
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index f8f70dfe19..ed00f7d834 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.manager.common.consts;
*/
public class StreamType {
- @SupportSortType(sortType = SortType.SORT_FLINK)
+ @SupportSortType(sortType = SortType.SORT_STANDALONE)
public static final String KAFKA = "KAFKA";
@SupportSortType(sortType = SortType.SORT_FLINK)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
index 275a242838..f9a0171693 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
@@ -78,6 +78,15 @@ public class ClsSink extends StreamSink {
@ApiModelProperty("Cloud log service topic storage duration")
private Integer storageDuration;
+ @ApiModelProperty("contentOffset")
+ private Integer contentOffset = 0;
+
+ @ApiModelProperty("fieldOffset")
+ private Integer fieldOffset;
+
+ @ApiModelProperty("separator")
+ private String separator;
+
public ClsSink() {
this.setSinkType(SinkType.CLS);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
index 15e212d578..4e9f3f353e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
@@ -31,6 +31,8 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
+import java.util.Map;
+
/**
* Kafka sink info
*/
@@ -61,6 +63,9 @@ public class KafkaSink extends StreamSink {
@ApiModelProperty("Primary key is required when serializationType is json,
avro")
private String primaryKey;
+ @ApiModelProperty("Properties for kafka")
+ private Map<String, Object> properties;
+
public KafkaSink() {
this.setSinkType(SinkType.KAFKA);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
index 899fba146a..387d24e6c5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
@@ -28,6 +28,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import java.util.Map;
+
/**
* Kafka sink request.
*/
@@ -56,4 +58,7 @@ public class KafkaSinkRequest extends SinkRequest {
@ApiModelProperty("Primary key is required when serializationType is json,
avro")
private String primaryKey;
+ @ApiModelProperty("Properties for kafka")
+ private Map<String, Object> properties;
+
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
index 515ffb0936..dd7e15a2da 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -57,7 +57,6 @@ public class DorisProvider implements LoadNodeProvider {
List<FieldInfo> fieldInfos =
parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
List<FieldRelation> fieldRelations =
parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
Format format =
parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(),
dorisSink.getSinkMultipleFormat());
- log.info("Test sink doris pro username ={}", dorisSink);
return new DorisLoadNode(
dorisSink.getSinkName(),
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 597c934252..507cc1fc1a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -28,6 +28,7 @@ import
org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.util.Utils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.plugin.Plugin;
@@ -64,6 +65,7 @@ import
org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -233,6 +235,7 @@ public class SortServiceImpl implements SortService,
PluginBinder {
}
private void reloadMqCluster() {
+ Map<String, List<MqClusterConfig>> tempMqClusterMap = new HashMap<>();
List<ClusterConfigEntity> clusterConfigEntityList =
configLoader.loadAllClusterConfigEntity();
clusterConfigEntityList.forEach(clusterConfigEntity -> {
String clusterTag = clusterConfigEntity.getClusterTag();
@@ -241,9 +244,10 @@ public class SortServiceImpl implements SortService,
PluginBinder {
JsonUtils.parseArray(clusterConfigEntity.getConfigParams(),
PulsarClusterConfig.class);
List<MqClusterConfig> list = new
ArrayList<>(pulsarClusterConfigs);
- mqClusterConfigMap.putIfAbsent(clusterTag, list);
+ tempMqClusterMap.putIfAbsent(clusterTag, list);
}
});
+ mqClusterConfigMap = tempMqClusterMap;
}
private void reloadNodeConfig() {
@@ -270,6 +274,11 @@ public class SortServiceImpl implements SortService,
PluginBinder {
Map<String, String> sortConfigMd5s = new HashMap<>();
Map<String, List<SortTaskConfig>> temp = new HashMap<>();
List<SortConfigEntity> sinkConfigEntityList =
configLoader.loadAllSortConfigEntity();
+ for (SortConfigEntity sortConfigEntity : sinkConfigEntityList) {
+ if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+ sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+ }
+ }
Map<String, Map<String, Map<String, List<SortConfigEntity>>>>
cluster2SinkMap = sinkConfigEntityList.stream()
.collect(Collectors.groupingBy(SortConfigEntity::getInlongClusterName,
Collectors.groupingBy(SortConfigEntity::getSortTaskName,
@@ -296,9 +305,11 @@ public class SortServiceImpl implements SortService,
PluginBinder {
log.error("parse data flow config error for
sinkId={}", v.getSinkId(), e);
}
return null;
- }).filter(Objects::nonNull).collect(Collectors.toList());
+ }).filter(Objects::nonNull)
+ .sorted(Comparator.comparingInt(x ->
Integer.parseInt(x.getDataflowId())))
+ .collect(Collectors.toList());
SortClusterConfig sortClusterConfig =
SortClusterConfig.builder()
-
.mqClusterConfigs(mqClusterConfigMap.get(clusterTag))
+
.mqClusterConfigs(mqClusterConfigMap.getOrDefault(clusterTag, new
ArrayList<>()))
.clusterTag(clusterTag)
.dataFlowConfigs(dataFlowConfigs)
.build();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
index d9b78c2d08..65b7a17b25 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
@@ -114,6 +114,8 @@ public class ClsDataNodeOperator extends
AbstractDataNodeOperator {
public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
ClsNodeConfig clsNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, ClsNodeConfig::new);
+ ClsDataNodeDTO dto =
ClsDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, clsNodeConfig);
clsNodeConfig.setNodeName(dataNodeInfo.getName());
return clsNodeConfig;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 0470beb265..1c9acada19 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -124,7 +124,9 @@ public class ElasticsearchDataNodeOperator extends
AbstractDataNodeOperator {
@Override
public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+ ElasticsearchDataNodeDTO dto =
ElasticsearchDataNodeDTO.getFromJson(dataNodeInfo.getExtParams());
EsNodeConfig esNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, EsNodeConfig::new);
+ CommonBeanUtils.copyProperties(dto, esNodeConfig);
esNodeConfig.setNodeName(dataNodeInfo.getName());
return esNodeConfig;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index 968a670628..bf57e85fe5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -125,6 +125,8 @@ public class KafkaDataNodeOperator extends
AbstractDataNodeOperator {
public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
KafkaNodeConfig kafkaNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, KafkaNodeConfig::new);
+ // KafkaDataNodeDTO dto =
KafkaDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+ // CommonBeanUtils.copyProperties(dto, kafkaNodeConfig, true);
kafkaNodeConfig.setNodeName(dataNodeInfo.getName());
return kafkaNodeConfig;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
index 1d7ba9e951..26a7fa0a55 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -126,6 +126,8 @@ public class PulsarDataNodeOperator extends
AbstractDataNodeOperator {
public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
PulsarNodeConfig pulsarNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, PulsarNodeConfig::new);
+ PulsarDataNodeDTO dto =
PulsarDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, pulsarNodeConfig);
pulsarNodeConfig.setNodeName(dataNodeInfo.getName());
return pulsarNodeConfig;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 670201c357..3a887911ff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
@@ -62,6 +63,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -72,6 +74,10 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultSortConfigOperator.class);
+ @Autowired
+ public DeserializeOperatorFactory deserializeOperatorFactory;
+ @Autowired
+ public DataTypeOperatorFactory dataTypeOperatorFactory;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
@Autowired
@@ -81,10 +87,6 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
@Autowired
private InlongGroupEntityMapper groupEntityMapper;
@Autowired
- public DeserializeOperatorFactory deserializeOperatorFactory;
- @Autowired
- public DataTypeOperatorFactory dataTypeOperatorFactory;
- @Autowired
private SinkOperatorFactory operatorFactory;
@Override
@@ -121,10 +123,8 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
}
InlongGroupEntity groupEntity =
groupEntityMapper.selectByGroupId(groupInfo.getInlongGroupId());
Preconditions.expectTrue(MQType.PULSAR.equals(groupEntity.getMqType()),
"standalone only support pulsar");
- for (StreamSink sink : streamInfo.getSinkList()) {
- if (SinkType.SORT_STANDALONE_SINK.contains(sink.getSinkType())) {
- saveDataFlow(groupInfo, streamInfo, sink);
- }
+ for (StreamSink sink : sinkList) {
+ saveDataFlow(groupInfo, streamInfo, sink);
}
}
@@ -139,14 +139,21 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
if (sortConfigEntity == null) {
dataFlowConfig.setVersion(0);
sortConfigEntity = CommonBeanUtils.copyProperties(sink,
SortConfigEntity::new);
+ sortConfigEntity.setId(null);
+ if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+
sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+ }
sortConfigEntity.setSinkId(sink.getId());
sortConfigEntity.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
sortConfigEntity.setInlongClusterTag(clusterTags);
sortConfigEntityMapper.insert(sortConfigEntity);
} else {
- dataFlowConfig.setVersion(sortConfigEntity.getVersion());
+ dataFlowConfig.setVersion(sortConfigEntity.getVersion() + 1);
sortConfigEntity.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
sortConfigEntity.setInlongClusterTag(clusterTags);
+ if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+
sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+ }
sortConfigEntityMapper.updateByIdSelective(sortConfigEntity);
}
} catch (Exception e) {
@@ -157,20 +164,23 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
}
private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ HashMap<String, Object> properties = new HashMap<>();
return DataFlowConfig.builder()
.dataflowId(String.valueOf(sink.getId()))
.sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
.auditTag(String.valueOf(sink.getId()))
- .sinkConfig(getSinkConfig(sink))
+ .sinkConfig(getSinkConfig(groupInfo, streamInfo, sink))
.inlongGroupId(groupInfo.getInlongGroupId())
.inlongStreamId(streamInfo.getInlongStreamId())
+ .properties(properties)
.build();
}
- private SinkConfig getSinkConfig(StreamSink sink) {
+ private SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
StreamSinkOperator sinkOperator =
operatorFactory.getInstance(sink.getSinkType());
- return sinkOperator.getSinkConfig(sink);
+ return sinkOperator.getSinkConfig(groupInfo, streamInfo, sink);
}
+
private SourceConfig getSourceConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
List<InlongClusterEntity> pulsarClusters =
clusterMapper.selectByKey(groupInfo.getInlongClusterTag(),
null, MQType.PULSAR);
@@ -206,7 +216,8 @@ public class DefaultSortConfigOperator implements
SortConfigOperator {
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
DataTypeConfig dataTypeConfig =
dataTypeOperator.getDataTypeConfig(streamInfo);
SourceConfig sourceConfig = new SourceConfig();
- List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+ List<StreamSinkFieldEntity> sinkFieldEntities =
sinkFieldMapper.selectBySinkId(sink.getId());
+ List<FieldConfig> fields = sinkFieldEntities.stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 9ace6bff7f..64aa8f6dc7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -33,10 +33,12 @@ import
org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
@@ -168,7 +170,7 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
throw new
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
}
for (int i = 0; i < existsFieldList.size(); i++) {
- if
(!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName()))
{
+ if
(!existsFieldList.get(i).getFieldName().equalsIgnoreCase(fieldRequestList.get(i).getFieldName()))
{
throw new
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
}
}
@@ -288,7 +290,7 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
}
@Override
- public SinkConfig getSinkConfig(StreamSink sink) {
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
throw new BusinessException(String.format("not support get sink config
for sink type=%s", sink.getSinkType()));
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 87e3924b38..906fbc36f0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -21,10 +21,12 @@ import
org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import com.github.pagehelper.Page;
@@ -131,8 +133,11 @@ public interface StreamSinkOperator {
/**
* Get the sink config.
*
+ * @param groupInfo inlong group info
+ * @param streamInfo inlong stream info
* @param sink sink info
* @return sink config
*/
- SinkConfig getSinkConfig(StreamSink sink);
+ SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo
streamInfo, StreamSink sink);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index 23aa02166e..0012a42d43 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeInfo;
@@ -43,6 +44,7 @@ import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -147,8 +149,9 @@ public class ClsSinkOperator extends AbstractSinkOperator {
}
@Override
- public SinkConfig getSinkConfig(StreamSink sink) {
- ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink,
ClsSinkConfig::new);
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ ClsSink clsSink = (ClsSink) sink;
+ ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(clsSink,
ClsSinkConfig::new);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 659b934f8a..753ae75aa6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -40,6 +41,7 @@ import
org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -193,8 +195,12 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
}
@Override
- public SinkConfig getSinkConfig(StreamSink sink) {
- EsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink,
EsSinkConfig::new);
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ ElasticsearchSink elasticsearchSink = (ElasticsearchSink) sink;
+ StreamSinkEntity streamSinkEntity =
sinkMapper.selectByPrimaryKey(sink.getId());
+ ElasticsearchSinkDTO elasticsearchSinkDTO =
ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
+ EsSinkConfig sinkConfig =
CommonBeanUtils.copyProperties(elasticsearchSink, EsSinkConfig::new);
+ CommonBeanUtils.copyProperties(elasticsearchSinkDTO, sinkConfig);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 169e7d1461..3ce54c7d5e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -34,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -119,8 +121,9 @@ public class KafkaSinkOperator extends AbstractSinkOperator
{
}
@Override
- public SinkConfig getSinkConfig(StreamSink sink) {
- KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink,
KafkaSinkConfig::new);
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ KafkaSink kafkaSink = (KafkaSink) sink;
+ KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(kafkaSink,
KafkaSinkConfig::new);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
index 7a84d5aee3..50ce6bcd72 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -39,6 +40,7 @@ import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -137,8 +139,9 @@ public class PulsarSinkOperator extends
AbstractSinkOperator {
}
@Override
- public SinkConfig getSinkConfig(StreamSink sink) {
- PulsarSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink,
PulsarSinkConfig::new);
+ public SinkConfig getSinkConfig(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, StreamSink sink) {
+ PulsarSink pulsarSink = (PulsarSink) sink;
+ PulsarSinkConfig sinkConfig =
CommonBeanUtils.copyProperties(pulsarSink, PulsarSinkConfig::new);
List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
v -> {
FieldConfig fieldConfig = new FieldConfig();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 281858abb2..9167ab02c7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -606,7 +606,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
List<InlongStreamExtInfo> extList = request.getExtList();
saveOrUpdateExt(groupId, streamId, extList);
if (request.getSyncField()) {
- LOGGER.info("test begin sync field={}", request);
List<StreamSinkEntity> sinkEntityList =
sinkMapper.selectByRelatedId(groupId, streamId);
for (StreamSinkEntity sinkEntity : sinkEntityList) {
StreamSinkOperator sinkOperator =
sinkOperatorFactory.getInstance(sinkEntity.getSinkType());
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index a8d5faad54..887ecf8b9e 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -62,7 +62,7 @@ public class SortController {
@ApiOperation(value = "get sort config")
public SortConfigResponse getSortConfig(
@RequestParam String clusterName,
- @RequestParam String md5) {
+ @RequestParam(required = false) String md5) {
return sortService.getSortConfig(clusterName, md5);
}