This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 40548ae01b [INLONG-10229][Sort] EsSink support unified configuration 
(#10237)
40548ae01b is described below

commit 40548ae01b3ae294113484c456ea99c815c340ee
Author: vernedeng <[email protected]>
AuthorDate: Fri May 17 19:51:55 2024 +0800

    [INLONG-10229][Sort] EsSink support unified configuration (#10237)
---
 .../inlong/common/pojo/sort/node/NodeConfig.java   |   2 +
 .../config/holder/v2/SortConfigHolder.java         |   2 +
 .../v2/ClassResourceSortClusterConfigLoader.java   |   5 +-
 .../sort/standalone/sink/cls/ClsSinkContext.java   |   5 +-
 .../standalone/sink/elasticsearch/EsIdConfig.java  | 196 ++++-----------------
 .../sort/standalone/sink/elasticsearch/EsSink.java |   2 +-
 .../sink/elasticsearch/EsSinkContext.java          |  70 ++++----
 .../src/test/java/SortClusterConfig.conf           | 193 +++++++++++++-------
 .../src/test/resources/SortClusterConfig.conf      | 193 +++++++++++++-------
 9 files changed, 330 insertions(+), 338 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 921bdf2e02..1c82d4c9e2 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 import java.io.Serializable;
+import java.util.Map;
 
 @Data
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -36,4 +37,5 @@ public abstract class NodeConfig implements Serializable {
 
     private Integer version;
     private String nodeName;
+    private Map<String, String> properties;
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index eb2bf9bf92..272df1cc19 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -130,9 +130,11 @@ public class SortConfigHolder {
                                     .stream()
                                     .map(SortClusterConfig::getDataFlowConfigs)
                                     .flatMap(Collection::stream)
+                                    .filter(flow -> 
StringUtils.isNotEmpty(flow.getAuditTag()))
                                     .collect(Collectors.toMap(flow -> 
InlongId.generateUid(flow.getInlongGroupId(),
                                             flow.getInlongStreamId()),
                                             DataFlowConfig::getAuditTag))));
+            this.config = newConfig;
         } catch (Throwable e) {
             log.error("failed to reload sort config", e);
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
index 629d8fef54..0d3cd08089 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
@@ -21,8 +21,8 @@ import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
@@ -45,7 +45,8 @@ public class ClassResourceSortClusterConfigLoader implements 
SortConfigLoader {
                     Charset.defaultCharset());
             int index = confString.indexOf('{');
             confString = confString.substring(index);
-            return objectMapper.readValue(confString, SortConfig.class);
+            SortConfig config = objectMapper.readValue(confString, 
SortConfig.class);
+            return config;
         } catch (Exception e) {
             LOG.error("failed to load properties, file ={}", fileName, e);
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index fef43cadfc..34bddaed32 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -105,8 +105,9 @@ public class ClsSinkContext extends SinkContext {
                     objectMapper.writeValueAsString(newSortTaskConfig));
             this.sortTaskConfig = newSortTaskConfig;
             ClsNodeConfig requestNodeConfig = (ClsNodeConfig) 
sortTaskConfig.getNodeConfig();
-            this.clsNodeConfig =
-                    requestNodeConfig.getVersion() >= 
clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig;
+            if (clsNodeConfig == null || requestNodeConfig.getVersion() > 
clsNodeConfig.getVersion()) {
+                this.clsNodeConfig = requestNodeConfig;
+            }
             this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
             this.reloadIdParams();
             this.reloadClients();
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 1782bdf262..60d5d447dd 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -17,16 +17,24 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.stream.Collectors;
 
-/**
- * 
- * EsIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class EsIdConfig {
 
     public static final String PATTERN_DAY = "{yyyyMMdd}";
@@ -63,175 +71,35 @@ public class EsIdConfig {
     private int contentOffset = 0;// except for boss + tab(1)
     private List<String> fieldList;
 
-    /**
-     * get inlongGroupId
-     * 
-     * @return the inlongGroupId
-     */
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    /**
-     * set inlongGroupId
-     * 
-     * @param inlongGroupId the inlongGroupId to set
-     */
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    /**
-     * get inlongStreamId
-     * 
-     * @return the inlongStreamId
-     */
-    public String getInlongStreamId() {
-        return inlongStreamId;
-    }
-
-    /**
-     * set inlongStreamId
-     * 
-     * @param inlongStreamId the inlongStreamId to set
-     */
-    public void setInlongStreamId(String inlongStreamId) {
-        this.inlongStreamId = inlongStreamId;
-    }
-
-    /**
-     * get separator
-     * 
-     * @return the separator
-     */
-    public String getSeparator() {
-        return separator;
-    }
-
-    /**
-     * set separator
-     * 
-     * @param separator the separator to set
-     */
-    public void setSeparator(String separator) {
-        this.separator = separator;
-    }
-
-    /**
-     * get indexNamePattern
-     * 
-     * @return the indexNamePattern
-     */
-    public String getIndexNamePattern() {
-        return indexNamePattern;
-    }
-
-    /**
-     * set indexNamePattern
-     * 
-     * @param indexNamePattern the indexNamePattern to set
-     */
-    public void setIndexNamePattern(String indexNamePattern) {
-        this.indexNamePattern = indexNamePattern;
-    }
-
-    /**
-     * get fieldOffset
-     * 
-     * @return the fieldOffset
-     */
-    public int getFieldOffset() {
-        return fieldOffset;
-    }
-
-    /**
-     * set fieldOffset
-     * 
-     * @param fieldOffset the fieldOffset to set
-     */
-    public void setFieldOffset(int fieldOffset) {
-        this.fieldOffset = fieldOffset;
-    }
-
-    /**
-     * get fieldList
-     * 
-     * @return the fieldList
-     */
-    public List<String> getFieldList() {
-        if (fieldList == null) {
-            this.fieldList = new ArrayList<>();
-            if (fieldNames != null) {
-                String[] fieldNameArray = fieldNames.split("\\s+");
-                this.fieldList.addAll(Arrays.asList(fieldNameArray));
-            }
-        }
-        return fieldList;
-    }
-
-    /**
-     * set fieldList
-     * 
-     * @param fieldList the fieldList to set
-     */
-    public void setFieldList(List<String> fieldList) {
-        this.fieldList = fieldList;
-    }
-
-    /**
-     * get fieldNames
-     * 
-     * @return the fieldNames
-     */
-    public String getFieldNames() {
-        return fieldNames;
-    }
-
-    /**
-     * set fieldNames
-     * 
-     * @param fieldNames the fieldNames to set
-     */
-    public void setFieldNames(String fieldNames) {
-        this.fieldNames = fieldNames;
-    }
-
-    /**
-     * get contentOffset
-     * 
-     * @return the contentOffset
-     */
-    public int getContentOffset() {
-        return contentOffset;
-    }
-
-    /**
-     * set contentOffset
-     * 
-     * @param contentOffset the contentOffset to set
-     */
-    public void setContentOffset(int contentOffset) {
-        this.contentOffset = contentOffset;
+    public static EsIdConfig create(DataFlowConfig dataFlowConfig) {
+        EsSinkConfig sinkConfig = (EsSinkConfig) 
dataFlowConfig.getSinkConfig();
+        List<String> fields = sinkConfig.getFieldConfigs()
+                .stream()
+                .map(FieldConfig::getName)
+                .collect(Collectors.toList());
+        return EsIdConfig.builder()
+                .inlongGroupId(dataFlowConfig.getInlongGroupId())
+                .inlongStreamId(dataFlowConfig.getInlongStreamId())
+                .contentOffset(sinkConfig.getContentOffset())
+                .fieldOffset(sinkConfig.getFieldOffset())
+                .separator(sinkConfig.getSeparator())
+                .indexNamePattern(sinkConfig.getIndexNamePattern())
+                .fieldList(fields)
+                .build();
     }
 
-    /**
-     * parseIndexName
-     * 
-     * @param  msgTime
-     * @return
-     */
     public String parseIndexName(long msgTime) {
         Date dtDate = new Date(msgTime);
         String result = indexNamePattern;
-        if (result.indexOf(PATTERN_MINUTE) >= 0) {
+        if (result.contains(PATTERN_MINUTE)) {
             String strHour = FORMAT_MINUTE.get().format(dtDate);
             result = result.replaceAll(REGEX_MINUTE, strHour);
         }
-        if (result.indexOf(PATTERN_HOUR) >= 0) {
+        if (result.contains(PATTERN_HOUR)) {
             String strHour = FORMAT_HOUR.get().format(dtDate);
             result = result.replaceAll(REGEX_HOUR, strHour);
         }
-        if (result.indexOf(PATTERN_DAY) >= 0) {
+        if (result.contains(PATTERN_DAY)) {
             String strHour = FORMAT_DAY.get().format(dtDate);
             result = result.replaceAll(REGEX_DAY, strHour);
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index f283e5c855..a6d3dba5b3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 
 import org.apache.flume.Context;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 87714c1149..e9dcb26a7f 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -17,19 +17,19 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
-import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -40,11 +40,13 @@ import org.apache.http.HttpHost;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * 
@@ -85,8 +87,10 @@ public class EsSinkContext extends SinkContext {
     public static final boolean DEFAULT_IS_USE_INDEX_ID = false;
 
     private Context sinkContext;
+    private EsNodeConfig esNodeConfig;
     private String nodeId;
     private Map<String, EsIdConfig> idConfigMap = new ConcurrentHashMap<>();
+    private ObjectMapper objectMapper = new ObjectMapper();
     private final BufferQueue<EsIndexRequest> dispatchQueue;
     private AtomicLong offerCounter = new AtomicLong(0);
     private AtomicLong takeCounter = new AtomicLong(0);
@@ -134,48 +138,48 @@ public class EsSinkContext extends SinkContext {
             LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
                     taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
                     takeCounter.getAndSet(0), backCounter.getAndSet(0));
-            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
-                    new ObjectMapper().writeValueAsString(newSortTaskConfig));
+                    objectMapper.writeValueAsString(newSortTaskConfig));
             this.sortTaskConfig = newSortTaskConfig;
-            this.sinkContext = new 
Context(this.sortTaskConfig.getSinkParams());
-            // parse the config of id and topic
-            Map<String, EsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
-            List<Map<String, String>> idList = 
this.sortTaskConfig.getIdParams();
-            ObjectMapper objectMapper = new ObjectMapper();
-            
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            for (Map<String, String> idParam : idList) {
-                String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
-                String inlongStreamId = 
idParam.get(Constants.INLONG_STREAM_ID);
-                String uid = InlongId.generateUid(inlongGroupId, 
inlongStreamId);
-                String jsonIdConfig = objectMapper.writeValueAsString(idParam);
-                EsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, 
EsIdConfig.class);
-                idConfig.getFieldList();
-                newIdConfigMap.put(uid, idConfig);
+            EsNodeConfig requestNodeConfig = (EsNodeConfig) 
sortTaskConfig.getNodeConfig();
+            if (esNodeConfig == null || requestNodeConfig.getVersion() > 
esNodeConfig.getVersion()) {
+                this.esNodeConfig = requestNodeConfig;
             }
+            Map<String, String> properties = 
this.sortTaskConfig.getNodeConfig().getProperties();
+            this.sinkContext = new Context(properties != null ? properties : 
new HashMap<>());
             // change current config
-            this.idConfigMap = newIdConfigMap;
+            this.idConfigMap = this.sortTaskConfig.getClusters()
+                    .stream()
+                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .flatMap(Collection::stream)
+                    .map(EsIdConfig::create)
+                    .collect(Collectors.toMap(
+                            config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                            v -> v,
+                            (flow1, flow2) -> flow1));
             // rest client
-            this.username = sinkContext.getString(KEY_USERNAME);
-            this.password = sinkContext.getString(KEY_PASSWORD);
-            this.bulkAction = sinkContext.getInteger(KEY_BULK_ACTION, 
DEFAULT_BULK_ACTION);
-            this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB, 
DEFAULT_BULK_SIZE_MB);
-            this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL, 
DEFAULT_FLUSH_INTERVAL);
-            this.concurrentRequests = 
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
-            this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL, 
DEFAULT_MAX_CONNECT_TOTAL);
+            this.username = esNodeConfig.getUsername();
+            this.password = esNodeConfig.getPassword();
+            this.bulkAction = esNodeConfig.getBulkAction();
+            this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
+            this.flushInterval = esNodeConfig.getFlushInterval();
+            this.concurrentRequests = esNodeConfig.getConcurrentRequests();
+            this.maxConnect = esNodeConfig.getMaxConnect();
+            this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
+            this.isUseIndexId = esNodeConfig.getIsUseIndexId();
+
             this.maxConnectPerRoute = 
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE, 
DEFAULT_MAX_CONNECT_PER_ROUTE);
             this.connectionRequestTimeout =
                     sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT, 
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
             this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT, 
DEFAULT_SOCKET_TIMEOUT);
             this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS, 
DEFAULT_MAX_REDIRECTS);
             this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH, 
DEFAULT_LOG_MAX_LENGTH);
-            this.keywordMaxLength = 
sinkContext.getInteger(KEY_KEYWORD_MAX_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
-            this.isUseIndexId = sinkContext.getBoolean(KEY_IS_USE_INDEX_ID, 
DEFAULT_IS_USE_INDEX_ID);
             // http host
-            this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
+            this.strHttpHosts = esNodeConfig.getHttpHosts();
             if (!StringUtils.isBlank(strHttpHosts)) {
                 String[] strHttpHostArray = strHttpHosts.split("\\s+");
                 List<HttpHost> newHttpHosts = new 
ArrayList<>(strHttpHostArray.length);
@@ -192,7 +196,7 @@ public class EsSinkContext extends SinkContext {
             }
             // log
             LOG.info("end to get 
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
-                    new ObjectMapper().writeValueAsString(newIdConfigMap));
+                    objectMapper.writeValueAsString(idConfigMap));
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
index 10c51cd0fb..637698dfa7 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
@@ -14,72 +14,129 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 {
-               "clusterName":"esv3-sz-sz1",
-               "sortTasks":[
-                       {
-                               "idParams":[
-                                       {
-                                               "inlongGroupId":"0fc00000046",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong0fc00000046_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo t1 
t2 t3 t4",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"03600000045",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong03600000045_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo t1 
t2 t3",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"05100054990",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong05100054990_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo 
field1 field2 field3 field4",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"09c00014434",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong09c00014434_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo id 
elog containerName cid setName ip",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"0c900035509",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong0c900035509_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo 
statTime msgTime senderIp containerName cid kafkaGroupId KafkaClusterId topic 
partitionId count size lastMaxOffset minOffset maxOffset minAckOffset 
maxAckOffset",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       }
-                               ],
-                               "name":"sid_es_es-rmrv7g7a_v3",
-                               "sinkParams":{
-                                       "httpHosts":"127.0.0.1:9200",
-                                       "username":"elastic",
-                                       "password":"elastic",
-                                       "bulkAction":4000,
-                                       "bulkSizeMb":10,
-                                       "flushInterval":60,
-                                       "concurrentRequests":5,
-                                       "maxConnect":10,
-                                       "keywordMaxLength":32767,
-                                       "isUseIndexId":false
-                               },
-                               "type":"ElasticSearch"
-                       }
-               ]
-       }
+    "sortClusterName": "esv3-gz-gz1",
+    "tasks": [
+        {
+            "sortTaskName": "sid_es_es-rmrv7g7a_v3",
+            "clusters": [
+                {
+                    "clusterTag": "default_cluster",
+                    "mqClusterConfigs": [
+                        {
+                            "type": "PULSAR",
+                            "version": "9",
+                            "clusterName": "xxxx",
+                            "adminUrl": "xxx",
+                            "serviceUrl": "xxx",
+                            "token": ""
+                        }
+                    ],
+                    "dataFlowConfigs": [
+                        {
+                            "dataflowId": "553",
+                            "version": 0,
+                            "auditTag": "553",
+                            "inlongGroupId": "",
+                            "inlongStreamId": "0fc00000046",
+                            "sourceConfig": {
+                                "topic": 
"persistent://tenant/namespace/0fc00000046",
+                                "subscription": "sid_es_v3",
+                                "encodingType": "UTF-8",
+                                "deserializationConfig": {
+                                    "type": "INLONG_MSG",
+                                    "streamId": "0fc00000046"
+                                },
+                                "dataTypeConfig": {
+                                    "type": "CSV",
+                                    "delimiter": "|",
+                                    "escapeChar": "\\"
+                                },
+                                "fieldConfigs": [
+                                    {
+                                        "name": "name",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "var1",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    }
+                                ]
+                            },
+                            "sinkConfig": {
+                                "type": "ELASTICSEARCH",
+                                "encodingType": null,
+                                "fieldConfigs": [
+                                    {
+                                        "name": "ftime",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "extinfo",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t1",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t2",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t3",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t4",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    }
+                                ],
+                                "indexNamePattern": 
"inlong0fc00000046_{yyyyMMdd}",
+                                "contentOffset": 0,
+                                "fieldOffset": 2,
+                                "separator": "|"
+                            },
+                            "properties": null
+                        }
+                    ]
+                }
+            ],
+            "nodeConfig": {
+                "type": "ELASTICSEARCH",
+                "version": 4,
+                "nodeName": "sid_es_es-rmrv7g7a_v3",
+                "bulkAction": 4000,
+                "bulkSizeMb": 10,
+                "flushInterval": 60,
+                "concurrentRequests": 5,
+                "maxConnect": 10,
+                "keywordMaxLength": 32767,
+                "isUseIndexId": false,
+                "maxThreads": 2,
+                "auditSetName": null,
+                "httpHosts": "127.0.0.1:9200",
+                "username": "elastic",
+                "token": "password",
+                "password": "password"
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
index 10c51cd0fb..637698dfa7 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
@@ -14,72 +14,129 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 {
-               "clusterName":"esv3-sz-sz1",
-               "sortTasks":[
-                       {
-                               "idParams":[
-                                       {
-                                               "inlongGroupId":"0fc00000046",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong0fc00000046_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo t1 
t2 t3 t4",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"03600000045",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong03600000045_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo t1 
t2 t3",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"05100054990",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong05100054990_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo 
field1 field2 field3 field4",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"09c00014434",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong09c00014434_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo id 
elog containerName cid setName ip",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       },
-                                       {
-                                               "inlongGroupId":"0c900035509",
-                                               "inlongStreamId":"",
-                                               "separator":"|",
-                                               
"indexNamePattern":"inlong0c900035509_{yyyyMMdd}",
-                                               "fieldNames":"ftime extinfo 
statTime msgTime senderIp containerName cid kafkaGroupId KafkaClusterId topic 
partitionId count size lastMaxOffset minOffset maxOffset minAckOffset 
maxAckOffset",
-                                               "fieldOffset":2,
-                                               "contentOffset":0
-                                       }
-                               ],
-                               "name":"sid_es_es-rmrv7g7a_v3",
-                               "sinkParams":{
-                                       "httpHosts":"127.0.0.1:9200",
-                                       "username":"elastic",
-                                       "password":"elastic",
-                                       "bulkAction":4000,
-                                       "bulkSizeMb":10,
-                                       "flushInterval":60,
-                                       "concurrentRequests":5,
-                                       "maxConnect":10,
-                                       "keywordMaxLength":32767,
-                                       "isUseIndexId":false
-                               },
-                               "type":"ElasticSearch"
-                       }
-               ]
-       }
+    "sortClusterName": "esv3-gz-gz1",
+    "tasks": [
+        {
+            "sortTaskName": "sid_es_es-rmrv7g7a_v3",
+            "clusters": [
+                {
+                    "clusterTag": "default_cluster",
+                    "mqClusterConfigs": [
+                        {
+                            "type": "PULSAR",
+                            "version": "9",
+                            "clusterName": "xxxx",
+                            "adminUrl": "xxx",
+                            "serviceUrl": "xxx",
+                            "token": ""
+                        }
+                    ],
+                    "dataFlowConfigs": [
+                        {
+                            "dataflowId": "553",
+                            "version": 0,
+                            "auditTag": "553",
+                            "inlongGroupId": "",
+                            "inlongStreamId": "0fc00000046",
+                            "sourceConfig": {
+                                "topic": 
"persistent://tenant/namespace/0fc00000046",
+                                "subscription": "sid_es_v3",
+                                "encodingType": "UTF-8",
+                                "deserializationConfig": {
+                                    "type": "INLONG_MSG",
+                                    "streamId": "0fc00000046"
+                                },
+                                "dataTypeConfig": {
+                                    "type": "CSV",
+                                    "delimiter": "|",
+                                    "escapeChar": "\\"
+                                },
+                                "fieldConfigs": [
+                                    {
+                                        "name": "name",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "var1",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    }
+                                ]
+                            },
+                            "sinkConfig": {
+                                "type": "ELASTICSEARCH",
+                                "encodingType": null,
+                                "fieldConfigs": [
+                                    {
+                                        "name": "ftime",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "extinfo",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t1",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t2",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t3",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    },
+                                    {
+                                        "name": "t4",
+                                        "formatInfo": {
+                                            "type": "string"
+                                        }
+                                    }
+                                ],
+                                "indexNamePattern": 
"inlong0fc00000046_{yyyyMMdd}",
+                                "contentOffset": 0,
+                                "fieldOffset": 2,
+                                "separator": "|"
+                            },
+                            "properties": null
+                        }
+                    ]
+                }
+            ],
+            "nodeConfig": {
+                "type": "ELASTICSEARCH",
+                "version": 4,
+                "nodeName": "sid_es_es-rmrv7g7a_v3",
+                "bulkAction": 4000,
+                "bulkSizeMb": 10,
+                "flushInterval": 60,
+                "concurrentRequests": 5,
+                "maxConnect": 10,
+                "keywordMaxLength": 32767,
+                "isUseIndexId": false,
+                "maxThreads": 2,
+                "auditSetName": null,
+                "httpHosts": "127.0.0.1:9200",
+                "username": "elastic",
+                "token": "password",
+                "password": "password"
+            }
+        }
+    ]
+}
\ No newline at end of file


Reply via email to