This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7230ab677b [INLONG-10423][Manager] Modify unified configuration 
related classes and interfaces (#10424)
7230ab677b is described below

commit 7230ab677b9f1c6b2b1d23ec67f6eba5d014b25b
Author: fuweng11 <[email protected]>
AuthorDate: Sat Jun 15 14:49:09 2024 +0800

    [INLONG-10423][Manager] Modify unified configuration related classes and 
interfaces (#10424)
---
 .../apache/inlong/common/constant/SinkType.java    |  2 ++
 .../sort/dataflow/dataType/DataTypeConfig.java     |  2 +-
 .../deserialization/DeserializationConfig.java     |  2 +-
 .../common/pojo/sort/dataflow/sink/SinkConfig.java |  2 +-
 .../common/pojo/sort/mq/MqClusterConfig.java       |  2 +-
 .../inlong/common/pojo/sort/node/NodeConfig.java   |  2 +-
 .../manager/common/consts/InlongConstants.java     |  2 ++
 .../inlong/manager/common/consts/StreamType.java   |  2 +-
 .../inlong/manager/pojo/sink/cls/ClsSink.java      |  9 ++++++
 .../inlong/manager/pojo/sink/kafka/KafkaSink.java  |  5 +++
 .../manager/pojo/sink/kafka/KafkaSinkRequest.java  |  5 +++
 .../pojo/sort/node/provider/DorisProvider.java     |  1 -
 .../manager/service/core/impl/SortServiceImpl.java | 17 ++++++++--
 .../service/node/cls/ClsDataNodeOperator.java      |  2 ++
 .../node/es/ElasticsearchDataNodeOperator.java     |  2 ++
 .../service/node/kafka/KafkaDataNodeOperator.java  |  2 ++
 .../node/pulsar/PulsarDataNodeOperator.java        |  2 ++
 .../resource/sort/DefaultSortConfigOperator.java   | 37 ++++++++++++++--------
 .../manager/service/sink/AbstractSinkOperator.java |  6 ++--
 .../manager/service/sink/StreamSinkOperator.java   |  7 +++-
 .../manager/service/sink/cls/ClsSinkOperator.java  |  7 ++--
 .../service/sink/es/ElasticsearchSinkOperator.java | 10 ++++--
 .../service/sink/kafka/KafkaSinkOperator.java      |  7 ++--
 .../service/sink/pulsar/PulsarSinkOperator.java    |  7 ++--
 .../service/stream/InlongStreamServiceImpl.java    |  1 -
 .../web/controller/openapi/SortController.java     |  2 +-
 26 files changed, 109 insertions(+), 36 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
index 0dba17eb60..cd91b19464 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
@@ -23,4 +23,6 @@ public class SinkType {
     public static final String PULSAR = "PULSAR";
     public static final String CLS = "CLS";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
+    public static final String STARROCKS = "STARROCKS";
+
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
index 3451e0abce..6f5f03fb48 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/dataType/DataTypeConfig.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 
 import java.io.Serializable;
 
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = CsvConfig.class, name = 
DeserializationType.CSV),
         @JsonSubTypes.Type(value = KvConfig.class, name = 
DeserializationType.KV),
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
index 1325e3930e..b854af7c90 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/deserialization/DeserializationConfig.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 
 import java.io.Serializable;
 
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name 
= DeserializationType.INLONG_MSG),
         @JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name 
= DeserializationType.INLONG_MSG_PB),
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
index aa0d37ba3f..867d56dad6 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
 import java.util.List;
 
 @Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
         @JsonSubTypes.Type(value = EsSinkConfig.class, name = 
SinkType.ELASTICSEARCH),
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
index 599d2b661f..f39593689c 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/mq/MqClusterConfig.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
 import java.util.List;
 
 @Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = PulsarClusterConfig.class, name = 
MQType.PULSAR),
         @JsonSubTypes.Type(value = TubeClusterConfig.class, name = 
MQType.TUBEMQ)
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 49b7c8e0f3..ce0248e56d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 @Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = ClsNodeConfig.class, name = 
DataNodeType.CLS),
         @JsonSubTypes.Type(value = EsNodeConfig.class, name = 
DataNodeType.ELASTICSEARCH),
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c3085972fd..820c22adcf 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -127,6 +127,8 @@ public class InlongConstants {
 
     public static final String PULSAR_QUEUE_TYPE_PARALLEL = "PARALLEL";
 
+    public static final String DEFAULT_TASK = "DEFAULT_TASK";
+
     /**
      * Format of the Pulsar topic: "persistent://tenant/namespace/topic
      */
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index f8f70dfe19..ed00f7d834 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.manager.common.consts;
  */
 public class StreamType {
 
-    @SupportSortType(sortType = SortType.SORT_FLINK)
+    @SupportSortType(sortType = SortType.SORT_STANDALONE)
     public static final String KAFKA = "KAFKA";
 
     @SupportSortType(sortType = SortType.SORT_FLINK)
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
index 275a242838..f9a0171693 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
@@ -78,6 +78,15 @@ public class ClsSink extends StreamSink {
     @ApiModelProperty("Cloud log service topic storage duration")
     private Integer storageDuration;
 
+    @ApiModelProperty("contentOffset")
+    private Integer contentOffset = 0;
+
+    @ApiModelProperty("fieldOffset")
+    private Integer fieldOffset;
+
+    @ApiModelProperty("separator")
+    private String separator;
+
     public ClsSink() {
         this.setSinkType(SinkType.CLS);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
index 15e212d578..4e9f3f353e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSink.java
@@ -31,6 +31,8 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.experimental.SuperBuilder;
 
+import java.util.Map;
+
 /**
  * Kafka sink info
  */
@@ -61,6 +63,9 @@ public class KafkaSink extends StreamSink {
     @ApiModelProperty("Primary key is required when serializationType is json, 
avro")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for kafka")
+    private Map<String, Object> properties;
+
     public KafkaSink() {
         this.setSinkType(SinkType.KAFKA);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
index 899fba146a..387d24e6c5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkRequest.java
@@ -28,6 +28,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
+import java.util.Map;
+
 /**
  * Kafka sink request.
  */
@@ -56,4 +58,7 @@ public class KafkaSinkRequest extends SinkRequest {
     @ApiModelProperty("Primary key is required when serializationType is json, 
avro")
     private String primaryKey;
 
+    @ApiModelProperty("Properties for kafka")
+    private Map<String, Object> properties;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
index 515ffb0936..dd7e15a2da 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -57,7 +57,6 @@ public class DorisProvider implements LoadNodeProvider {
         List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
         List<FieldRelation> fieldRelations = 
parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
         Format format = 
parsingSinkMultipleFormat(dorisSink.getSinkMultipleEnable(), 
dorisSink.getSinkMultipleFormat());
-        log.info("Test sink doris pro username ={}", dorisSink);
 
         return new DorisLoadNode(
                 dorisSink.getSinkName(),
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 597c934252..507cc1fc1a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
 import org.apache.inlong.common.pojo.sort.node.NodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.util.Utils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.plugin.Plugin;
@@ -64,6 +65,7 @@ import 
org.springframework.transaction.annotation.Transactional;
 import javax.annotation.PostConstruct;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -233,6 +235,7 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
     }
 
     private void reloadMqCluster() {
+        Map<String, List<MqClusterConfig>> tempMqClusterMap = new HashMap<>();
         List<ClusterConfigEntity> clusterConfigEntityList = 
configLoader.loadAllClusterConfigEntity();
         clusterConfigEntityList.forEach(clusterConfigEntity -> {
             String clusterTag = clusterConfigEntity.getClusterTag();
@@ -241,9 +244,10 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                         
JsonUtils.parseArray(clusterConfigEntity.getConfigParams(),
                                 PulsarClusterConfig.class);
                 List<MqClusterConfig> list = new 
ArrayList<>(pulsarClusterConfigs);
-                mqClusterConfigMap.putIfAbsent(clusterTag, list);
+                tempMqClusterMap.putIfAbsent(clusterTag, list);
             }
         });
+        mqClusterConfigMap = tempMqClusterMap;
     }
 
     private void reloadNodeConfig() {
@@ -270,6 +274,11 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
         Map<String, String> sortConfigMd5s = new HashMap<>();
         Map<String, List<SortTaskConfig>> temp = new HashMap<>();
         List<SortConfigEntity> sinkConfigEntityList = 
configLoader.loadAllSortConfigEntity();
+        for (SortConfigEntity sortConfigEntity : sinkConfigEntityList) {
+            if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+                sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+            }
+        }
         Map<String, Map<String, Map<String, List<SortConfigEntity>>>> 
cluster2SinkMap = sinkConfigEntityList.stream()
                 
.collect(Collectors.groupingBy(SortConfigEntity::getInlongClusterName,
                         
Collectors.groupingBy(SortConfigEntity::getSortTaskName,
@@ -296,9 +305,11 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                             log.error("parse data flow config error for 
sinkId={}", v.getSinkId(), e);
                         }
                         return null;
-                    }).filter(Objects::nonNull).collect(Collectors.toList());
+                    }).filter(Objects::nonNull)
+                            .sorted(Comparator.comparingInt(x -> 
Integer.parseInt(x.getDataflowId())))
+                            .collect(Collectors.toList());
                     SortClusterConfig sortClusterConfig = 
SortClusterConfig.builder()
-                            
.mqClusterConfigs(mqClusterConfigMap.get(clusterTag))
+                            
.mqClusterConfigs(mqClusterConfigMap.getOrDefault(clusterTag, new 
ArrayList<>()))
                             .clusterTag(clusterTag)
                             .dataFlowConfigs(dataFlowConfigs)
                             .build();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
index d9b78c2d08..65b7a17b25 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
@@ -114,6 +114,8 @@ public class ClsDataNodeOperator extends 
AbstractDataNodeOperator {
     public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
         DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
         ClsNodeConfig clsNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, ClsNodeConfig::new);
+        ClsDataNodeDTO dto = 
ClsDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+        CommonBeanUtils.copyProperties(dto, clsNodeConfig);
         clsNodeConfig.setNodeName(dataNodeInfo.getName());
         return clsNodeConfig;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 0470beb265..1c9acada19 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -124,7 +124,9 @@ public class ElasticsearchDataNodeOperator extends 
AbstractDataNodeOperator {
     @Override
     public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
         DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+        ElasticsearchDataNodeDTO dto = 
ElasticsearchDataNodeDTO.getFromJson(dataNodeInfo.getExtParams());
         EsNodeConfig esNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, EsNodeConfig::new);
+        CommonBeanUtils.copyProperties(dto, esNodeConfig);
         esNodeConfig.setNodeName(dataNodeInfo.getName());
         return esNodeConfig;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index 968a670628..bf57e85fe5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -125,6 +125,8 @@ public class KafkaDataNodeOperator extends 
AbstractDataNodeOperator {
     public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
         DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
         KafkaNodeConfig kafkaNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, KafkaNodeConfig::new);
+        // KafkaDataNodeDTO dto = 
KafkaDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+        // CommonBeanUtils.copyProperties(dto, kafkaNodeConfig, true);
         kafkaNodeConfig.setNodeName(dataNodeInfo.getName());
         return kafkaNodeConfig;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
index 1d7ba9e951..26a7fa0a55 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -126,6 +126,8 @@ public class PulsarDataNodeOperator extends 
AbstractDataNodeOperator {
     public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
         DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
         PulsarNodeConfig pulsarNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, PulsarNodeConfig::new);
+        PulsarDataNodeDTO dto = 
PulsarDataNodeDTO.getFromJson(dataNodeEntity.getExtParams());
+        CommonBeanUtils.copyProperties(dto, pulsarNodeConfig);
         pulsarNodeConfig.setNodeName(dataNodeInfo.getName());
         return pulsarNodeConfig;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 670201c357..3a887911ff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.SortConfigEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
@@ -62,6 +63,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -72,6 +74,10 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultSortConfigOperator.class);
 
+    @Autowired
+    public DeserializeOperatorFactory deserializeOperatorFactory;
+    @Autowired
+    public DataTypeOperatorFactory dataTypeOperatorFactory;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
     @Autowired
@@ -81,10 +87,6 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     @Autowired
     private InlongGroupEntityMapper groupEntityMapper;
     @Autowired
-    public DeserializeOperatorFactory deserializeOperatorFactory;
-    @Autowired
-    public DataTypeOperatorFactory dataTypeOperatorFactory;
-    @Autowired
     private SinkOperatorFactory operatorFactory;
 
     @Override
@@ -121,10 +123,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
         }
         InlongGroupEntity groupEntity = 
groupEntityMapper.selectByGroupId(groupInfo.getInlongGroupId());
         
Preconditions.expectTrue(MQType.PULSAR.equals(groupEntity.getMqType()), 
"standalone only support pulsar");
-        for (StreamSink sink : streamInfo.getSinkList()) {
-            if (SinkType.SORT_STANDALONE_SINK.contains(sink.getSinkType())) {
-                saveDataFlow(groupInfo, streamInfo, sink);
-            }
+        for (StreamSink sink : sinkList) {
+            saveDataFlow(groupInfo, streamInfo, sink);
         }
 
     }
@@ -139,14 +139,21 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
             if (sortConfigEntity == null) {
                 dataFlowConfig.setVersion(0);
                 sortConfigEntity = CommonBeanUtils.copyProperties(sink, 
SortConfigEntity::new);
+                sortConfigEntity.setId(null);
+                if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+                    
sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+                }
                 sortConfigEntity.setSinkId(sink.getId());
                 
sortConfigEntity.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
                 sortConfigEntity.setInlongClusterTag(clusterTags);
                 sortConfigEntityMapper.insert(sortConfigEntity);
             } else {
-                dataFlowConfig.setVersion(sortConfigEntity.getVersion());
+                dataFlowConfig.setVersion(sortConfigEntity.getVersion() + 1);
                 
sortConfigEntity.setConfigParams(objectMapper.writeValueAsString(dataFlowConfig));
                 sortConfigEntity.setInlongClusterTag(clusterTags);
+                if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
+                    
sortConfigEntity.setSortTaskName(InlongConstants.DEFAULT_TASK);
+                }
                 sortConfigEntityMapper.updateByIdSelective(sortConfigEntity);
             }
         } catch (Exception e) {
@@ -157,20 +164,23 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     }
 
     private DataFlowConfig getDataFlowConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        HashMap<String, Object> properties = new HashMap<>();
         return DataFlowConfig.builder()
                 .dataflowId(String.valueOf(sink.getId()))
                 .sourceConfig(getSourceConfig(groupInfo, streamInfo, sink))
                 .auditTag(String.valueOf(sink.getId()))
-                .sinkConfig(getSinkConfig(sink))
+                .sinkConfig(getSinkConfig(groupInfo, streamInfo, sink))
                 .inlongGroupId(groupInfo.getInlongGroupId())
                 .inlongStreamId(streamInfo.getInlongStreamId())
+                .properties(properties)
                 .build();
     }
 
-    private SinkConfig getSinkConfig(StreamSink sink) {
+    private SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
         StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(sink.getSinkType());
-        return sinkOperator.getSinkConfig(sink);
+        return sinkOperator.getSinkConfig(groupInfo, streamInfo, sink);
     }
+
     private SourceConfig getSourceConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
         List<InlongClusterEntity> pulsarClusters =
                 clusterMapper.selectByKey(groupInfo.getInlongClusterTag(), 
null, MQType.PULSAR);
@@ -206,7 +216,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
                 
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
         DataTypeConfig dataTypeConfig = 
dataTypeOperator.getDataTypeConfig(streamInfo);
         SourceConfig sourceConfig = new SourceConfig();
-        List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+        List<StreamSinkFieldEntity> sinkFieldEntities = 
sinkFieldMapper.selectBySinkId(sink.getId());
+        List<FieldConfig> fields = sinkFieldEntities.stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
                     FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 9ace6bff7f..64aa8f6dc7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -33,10 +33,12 @@ import 
org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 
@@ -168,7 +170,7 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
                 throw new 
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
             }
             for (int i = 0; i < existsFieldList.size(); i++) {
-                if 
(!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName()))
 {
+                if 
(!existsFieldList.get(i).getFieldName().equalsIgnoreCase(fieldRequestList.get(i).getFieldName()))
 {
                     throw new 
BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
                 }
             }
@@ -288,7 +290,7 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     }
 
     @Override
-    public SinkConfig getSinkConfig(StreamSink sink) {
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
         throw new BusinessException(String.format("not support get sink config 
for sink type=%s", sink.getSinkType()));
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 87e3924b38..906fbc36f0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -21,10 +21,12 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 
 import com.github.pagehelper.Page;
@@ -131,8 +133,11 @@ public interface StreamSinkOperator {
     /**
      * Get the sink config.
      *
+     * @param groupInfo inlong group info
+     * @param streamInfo inlong stream info
      * @param sink sink info
      * @return sink config
      */
-    SinkConfig getSinkConfig(StreamSink sink);
+    SinkConfig getSinkConfig(InlongGroupInfo groupInfo, InlongStreamInfo 
streamInfo, StreamSink sink);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index 23aa02166e..0012a42d43 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeInfo;
@@ -43,6 +44,7 @@ import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
 import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -147,8 +149,9 @@ public class ClsSinkOperator extends AbstractSinkOperator {
     }
 
     @Override
-    public SinkConfig getSinkConfig(StreamSink sink) {
-        ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink, 
ClsSinkConfig::new);
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        ClsSink clsSink = (ClsSink) sink;
+        ClsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(clsSink, 
ClsSinkConfig::new);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 659b934f8a..753ae75aa6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -40,6 +41,7 @@ import 
org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -193,8 +195,12 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
     }
 
     @Override
-    public SinkConfig getSinkConfig(StreamSink sink) {
-        EsSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink, 
EsSinkConfig::new);
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        ElasticsearchSink elasticsearchSink = (ElasticsearchSink) sink;
+        StreamSinkEntity streamSinkEntity = 
sinkMapper.selectByPrimaryKey(sink.getId());
+        ElasticsearchSinkDTO elasticsearchSinkDTO = 
ElasticsearchSinkDTO.getFromJson(streamSinkEntity.getExtParams());
+        EsSinkConfig sinkConfig = 
CommonBeanUtils.copyProperties(elasticsearchSink, EsSinkConfig::new);
+        CommonBeanUtils.copyProperties(elasticsearchSinkDTO, sinkConfig);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 169e7d1461..3ce54c7d5e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -34,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -119,8 +121,9 @@ public class KafkaSinkOperator extends AbstractSinkOperator 
{
     }
 
     @Override
-    public SinkConfig getSinkConfig(StreamSink sink) {
-        KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink, 
KafkaSinkConfig::new);
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        KafkaSink kafkaSink = (KafkaSink) sink;
+        KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(kafkaSink, 
KafkaSinkConfig::new);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
index 7a84d5aee3..50ce6bcd72 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeDTO;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -39,6 +40,7 @@ import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSink;
 import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
 import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkRequest;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -137,8 +139,9 @@ public class PulsarSinkOperator extends 
AbstractSinkOperator {
     }
 
     @Override
-    public SinkConfig getSinkConfig(StreamSink sink) {
-        PulsarSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink, 
PulsarSinkConfig::new);
+    public SinkConfig getSinkConfig(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo, StreamSink sink) {
+        PulsarSink pulsarSink = (PulsarSink) sink;
+        PulsarSinkConfig sinkConfig = 
CommonBeanUtils.copyProperties(pulsarSink, PulsarSinkConfig::new);
         List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
                 v -> {
                     FieldConfig fieldConfig = new FieldConfig();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 281858abb2..9167ab02c7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -606,7 +606,6 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         List<InlongStreamExtInfo> extList = request.getExtList();
         saveOrUpdateExt(groupId, streamId, extList);
         if (request.getSyncField()) {
-            LOGGER.info("test begin sync field={}", request);
             List<StreamSinkEntity> sinkEntityList = 
sinkMapper.selectByRelatedId(groupId, streamId);
             for (StreamSinkEntity sinkEntity : sinkEntityList) {
                 StreamSinkOperator sinkOperator = 
sinkOperatorFactory.getInstance(sinkEntity.getSinkType());
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index a8d5faad54..887ecf8b9e 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -62,7 +62,7 @@ public class SortController {
     @ApiOperation(value = "get sort config")
     public SortConfigResponse getSortConfig(
             @RequestParam String clusterName,
-            @RequestParam String md5) {
+            @RequestParam(required = false) String md5) {
         return sortService.getSortConfig(clusterName, md5);
     }
 


Reply via email to