This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 40548ae01b [INLONG-10229][Sort] EsSink support unified configuration
(#10237)
40548ae01b is described below
commit 40548ae01b3ae294113484c456ea99c815c340ee
Author: vernedeng <[email protected]>
AuthorDate: Fri May 17 19:51:55 2024 +0800
[INLONG-10229][Sort] EsSink support unified configuration (#10237)
---
.../inlong/common/pojo/sort/node/NodeConfig.java | 2 +
.../config/holder/v2/SortConfigHolder.java | 2 +
.../v2/ClassResourceSortClusterConfigLoader.java | 5 +-
.../sort/standalone/sink/cls/ClsSinkContext.java | 5 +-
.../standalone/sink/elasticsearch/EsIdConfig.java | 196 ++++-----------------
.../sort/standalone/sink/elasticsearch/EsSink.java | 2 +-
.../sink/elasticsearch/EsSinkContext.java | 70 ++++----
.../src/test/java/SortClusterConfig.conf | 193 +++++++++++++-------
.../src/test/resources/SortClusterConfig.conf | 193 +++++++++++++-------
9 files changed, 330 insertions(+), 338 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 921bdf2e02..1c82d4c9e2 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -24,6 +24,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.io.Serializable;
+import java.util.Map;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -36,4 +37,5 @@ public abstract class NodeConfig implements Serializable {
private Integer version;
private String nodeName;
+ private Map<String, String> properties;
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index eb2bf9bf92..272df1cc19 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -130,9 +130,11 @@ public class SortConfigHolder {
.stream()
.map(SortClusterConfig::getDataFlowConfigs)
.flatMap(Collection::stream)
+ .filter(flow ->
StringUtils.isNotEmpty(flow.getAuditTag()))
.collect(Collectors.toMap(flow ->
InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
DataFlowConfig::getAuditTag))));
+ this.config = newConfig;
} catch (Throwable e) {
log.error("failed to reload sort config", e);
}
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
index 629d8fef54..0d3cd08089 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
@@ -21,8 +21,8 @@ import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flume.Context;
import org.slf4j.Logger;
@@ -45,7 +45,8 @@ public class ClassResourceSortClusterConfigLoader implements
SortConfigLoader {
Charset.defaultCharset());
int index = confString.indexOf('{');
confString = confString.substring(index);
- return objectMapper.readValue(confString, SortConfig.class);
+ SortConfig config = objectMapper.readValue(confString,
SortConfig.class);
+ return config;
} catch (Exception e) {
LOG.error("failed to load properties, file ={}", fileName, e);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index fef43cadfc..34bddaed32 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -105,8 +105,9 @@ public class ClsSinkContext extends SinkContext {
objectMapper.writeValueAsString(newSortTaskConfig));
this.sortTaskConfig = newSortTaskConfig;
ClsNodeConfig requestNodeConfig = (ClsNodeConfig)
sortTaskConfig.getNodeConfig();
- this.clsNodeConfig =
- requestNodeConfig.getVersion() >=
clsNodeConfig.getVersion() ? requestNodeConfig : clsNodeConfig;
+ if (clsNodeConfig == null || requestNodeConfig.getVersion() >
clsNodeConfig.getVersion()) {
+ this.clsNodeConfig = requestNodeConfig;
+ }
this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
this.reloadIdParams();
this.reloadClients();
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
index 1782bdf262..60d5d447dd 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsIdConfig.java
@@ -17,16 +17,24 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.EsSinkConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.stream.Collectors;
-/**
- *
- * EsIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class EsIdConfig {
public static final String PATTERN_DAY = "{yyyyMMdd}";
@@ -63,175 +71,35 @@ public class EsIdConfig {
private int contentOffset = 0;// except for boss + tab(1)
private List<String> fieldList;
- /**
- * get inlongGroupId
- *
- * @return the inlongGroupId
- */
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- /**
- * set inlongGroupId
- *
- * @param inlongGroupId the inlongGroupId to set
- */
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
- }
-
- /**
- * get inlongStreamId
- *
- * @return the inlongStreamId
- */
- public String getInlongStreamId() {
- return inlongStreamId;
- }
-
- /**
- * set inlongStreamId
- *
- * @param inlongStreamId the inlongStreamId to set
- */
- public void setInlongStreamId(String inlongStreamId) {
- this.inlongStreamId = inlongStreamId;
- }
-
- /**
- * get separator
- *
- * @return the separator
- */
- public String getSeparator() {
- return separator;
- }
-
- /**
- * set separator
- *
- * @param separator the separator to set
- */
- public void setSeparator(String separator) {
- this.separator = separator;
- }
-
- /**
- * get indexNamePattern
- *
- * @return the indexNamePattern
- */
- public String getIndexNamePattern() {
- return indexNamePattern;
- }
-
- /**
- * set indexNamePattern
- *
- * @param indexNamePattern the indexNamePattern to set
- */
- public void setIndexNamePattern(String indexNamePattern) {
- this.indexNamePattern = indexNamePattern;
- }
-
- /**
- * get fieldOffset
- *
- * @return the fieldOffset
- */
- public int getFieldOffset() {
- return fieldOffset;
- }
-
- /**
- * set fieldOffset
- *
- * @param fieldOffset the fieldOffset to set
- */
- public void setFieldOffset(int fieldOffset) {
- this.fieldOffset = fieldOffset;
- }
-
- /**
- * get fieldList
- *
- * @return the fieldList
- */
- public List<String> getFieldList() {
- if (fieldList == null) {
- this.fieldList = new ArrayList<>();
- if (fieldNames != null) {
- String[] fieldNameArray = fieldNames.split("\\s+");
- this.fieldList.addAll(Arrays.asList(fieldNameArray));
- }
- }
- return fieldList;
- }
-
- /**
- * set fieldList
- *
- * @param fieldList the fieldList to set
- */
- public void setFieldList(List<String> fieldList) {
- this.fieldList = fieldList;
- }
-
- /**
- * get fieldNames
- *
- * @return the fieldNames
- */
- public String getFieldNames() {
- return fieldNames;
- }
-
- /**
- * set fieldNames
- *
- * @param fieldNames the fieldNames to set
- */
- public void setFieldNames(String fieldNames) {
- this.fieldNames = fieldNames;
- }
-
- /**
- * get contentOffset
- *
- * @return the contentOffset
- */
- public int getContentOffset() {
- return contentOffset;
- }
-
- /**
- * set contentOffset
- *
- * @param contentOffset the contentOffset to set
- */
- public void setContentOffset(int contentOffset) {
- this.contentOffset = contentOffset;
+ public static EsIdConfig create(DataFlowConfig dataFlowConfig) {
+ EsSinkConfig sinkConfig = (EsSinkConfig)
dataFlowConfig.getSinkConfig();
+ List<String> fields = sinkConfig.getFieldConfigs()
+ .stream()
+ .map(FieldConfig::getName)
+ .collect(Collectors.toList());
+ return EsIdConfig.builder()
+ .inlongGroupId(dataFlowConfig.getInlongGroupId())
+ .inlongStreamId(dataFlowConfig.getInlongStreamId())
+ .contentOffset(sinkConfig.getContentOffset())
+ .fieldOffset(sinkConfig.getFieldOffset())
+ .separator(sinkConfig.getSeparator())
+ .indexNamePattern(sinkConfig.getIndexNamePattern())
+ .fieldList(fields)
+ .build();
}
- /**
- * parseIndexName
- *
- * @param msgTime
- * @return
- */
public String parseIndexName(long msgTime) {
Date dtDate = new Date(msgTime);
String result = indexNamePattern;
- if (result.indexOf(PATTERN_MINUTE) >= 0) {
+ if (result.contains(PATTERN_MINUTE)) {
String strHour = FORMAT_MINUTE.get().format(dtDate);
result = result.replaceAll(REGEX_MINUTE, strHour);
}
- if (result.indexOf(PATTERN_HOUR) >= 0) {
+ if (result.contains(PATTERN_HOUR)) {
String strHour = FORMAT_HOUR.get().format(dtDate);
result = result.replaceAll(REGEX_HOUR, strHour);
}
- if (result.indexOf(PATTERN_DAY) >= 0) {
+ if (result.contains(PATTERN_DAY)) {
String strHour = FORMAT_DAY.get().format(dtDate);
result = result.replaceAll(REGEX_DAY, strHour);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index f283e5c855..a6d3dba5b3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,7 +17,7 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.flume.Context;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 87714c1149..e9dcb26a7f 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -17,19 +17,19 @@
package org.apache.inlong.sort.standalone.sink.elasticsearch;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
-import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -40,11 +40,13 @@ import org.apache.http.HttpHost;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
*
@@ -85,8 +87,10 @@ public class EsSinkContext extends SinkContext {
public static final boolean DEFAULT_IS_USE_INDEX_ID = false;
private Context sinkContext;
+ private EsNodeConfig esNodeConfig;
private String nodeId;
private Map<String, EsIdConfig> idConfigMap = new ConcurrentHashMap<>();
+ private ObjectMapper objectMapper = new ObjectMapper();
private final BufferQueue<EsIndexRequest> dispatchQueue;
private AtomicLong offerCounter = new AtomicLong(0);
private AtomicLong takeCounter = new AtomicLong(0);
@@ -134,48 +138,48 @@ public class EsSinkContext extends SinkContext {
LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
takeCounter.getAndSet(0), backCounter.getAndSet(0));
- SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
return;
}
LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
- new ObjectMapper().writeValueAsString(newSortTaskConfig));
+ objectMapper.writeValueAsString(newSortTaskConfig));
this.sortTaskConfig = newSortTaskConfig;
- this.sinkContext = new
Context(this.sortTaskConfig.getSinkParams());
- // parse the config of id and topic
- Map<String, EsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
- List<Map<String, String>> idList =
this.sortTaskConfig.getIdParams();
- ObjectMapper objectMapper = new ObjectMapper();
-
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- for (Map<String, String> idParam : idList) {
- String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
- String inlongStreamId =
idParam.get(Constants.INLONG_STREAM_ID);
- String uid = InlongId.generateUid(inlongGroupId,
inlongStreamId);
- String jsonIdConfig = objectMapper.writeValueAsString(idParam);
- EsIdConfig idConfig = objectMapper.readValue(jsonIdConfig,
EsIdConfig.class);
- idConfig.getFieldList();
- newIdConfigMap.put(uid, idConfig);
+ EsNodeConfig requestNodeConfig = (EsNodeConfig)
sortTaskConfig.getNodeConfig();
+ if (esNodeConfig == null || requestNodeConfig.getVersion() >
esNodeConfig.getVersion()) {
+ this.esNodeConfig = requestNodeConfig;
}
+ Map<String, String> properties =
this.sortTaskConfig.getNodeConfig().getProperties();
+ this.sinkContext = new Context(properties != null ? properties :
new HashMap<>());
// change current config
- this.idConfigMap = newIdConfigMap;
+ this.idConfigMap = this.sortTaskConfig.getClusters()
+ .stream()
+ .map(SortClusterConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(EsIdConfig::create)
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v,
+ (flow1, flow2) -> flow1));
// rest client
- this.username = sinkContext.getString(KEY_USERNAME);
- this.password = sinkContext.getString(KEY_PASSWORD);
- this.bulkAction = sinkContext.getInteger(KEY_BULK_ACTION,
DEFAULT_BULK_ACTION);
- this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB,
DEFAULT_BULK_SIZE_MB);
- this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL,
DEFAULT_FLUSH_INTERVAL);
- this.concurrentRequests =
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
- this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL,
DEFAULT_MAX_CONNECT_TOTAL);
+ this.username = esNodeConfig.getUsername();
+ this.password = esNodeConfig.getPassword();
+ this.bulkAction = esNodeConfig.getBulkAction();
+ this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
+ this.flushInterval = esNodeConfig.getFlushInterval();
+ this.concurrentRequests = esNodeConfig.getConcurrentRequests();
+ this.maxConnect = esNodeConfig.getMaxConnect();
+ this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
+ this.isUseIndexId = esNodeConfig.getIsUseIndexId();
+
this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
this.connectionRequestTimeout =
sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
- this.keywordMaxLength =
sinkContext.getInteger(KEY_KEYWORD_MAX_LENGTH, DEFAULT_KEYWORD_MAX_LENGTH);
- this.isUseIndexId = sinkContext.getBoolean(KEY_IS_USE_INDEX_ID,
DEFAULT_IS_USE_INDEX_ID);
// http host
- this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
+ this.strHttpHosts = esNodeConfig.getHttpHosts();
if (!StringUtils.isBlank(strHttpHosts)) {
String[] strHttpHostArray = strHttpHosts.split("\\s+");
List<HttpHost> newHttpHosts = new
ArrayList<>(strHttpHostArray.length);
@@ -192,7 +196,7 @@ public class EsSinkContext extends SinkContext {
}
// log
LOG.info("end to get
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
- new ObjectMapper().writeValueAsString(newIdConfigMap));
+ objectMapper.writeValueAsString(idConfigMap));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
index 10c51cd0fb..637698dfa7 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
@@ -14,72 +14,129 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
{
- "clusterName":"esv3-sz-sz1",
- "sortTasks":[
- {
- "idParams":[
- {
- "inlongGroupId":"0fc00000046",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong0fc00000046_{yyyyMMdd}",
- "fieldNames":"ftime extinfo t1
t2 t3 t4",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"03600000045",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong03600000045_{yyyyMMdd}",
- "fieldNames":"ftime extinfo t1
t2 t3",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"05100054990",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong05100054990_{yyyyMMdd}",
- "fieldNames":"ftime extinfo
field1 field2 field3 field4",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"09c00014434",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong09c00014434_{yyyyMMdd}",
- "fieldNames":"ftime extinfo id
elog containerName cid setName ip",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"0c900035509",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong0c900035509_{yyyyMMdd}",
- "fieldNames":"ftime extinfo
statTime msgTime senderIp containerName cid kafkaGroupId KafkaClusterId topic
partitionId count size lastMaxOffset minOffset maxOffset minAckOffset
maxAckOffset",
- "fieldOffset":2,
- "contentOffset":0
- }
- ],
- "name":"sid_es_es-rmrv7g7a_v3",
- "sinkParams":{
- "httpHosts":"127.0.0.1:9200",
- "username":"elastic",
- "password":"elastic",
- "bulkAction":4000,
- "bulkSizeMb":10,
- "flushInterval":60,
- "concurrentRequests":5,
- "maxConnect":10,
- "keywordMaxLength":32767,
- "isUseIndexId":false
- },
- "type":"ElasticSearch"
- }
- ]
- }
+ "sortClusterName": "esv3-gz-gz1",
+ "tasks": [
+ {
+ "sortTaskName": "sid_es_es-rmrv7g7a_v3",
+ "clusters": [
+ {
+ "clusterTag": "default_cluster",
+ "mqClusterConfigs": [
+ {
+ "type": "PULSAR",
+ "version": "9",
+ "clusterName": "xxxx",
+ "adminUrl": "xxx",
+ "serviceUrl": "xxx",
+ "token": ""
+ }
+ ],
+ "dataFlowConfigs": [
+ {
+ "dataflowId": "553",
+ "version": 0,
+ "auditTag": "553",
+ "inlongGroupId": "",
+ "inlongStreamId": "0fc00000046",
+ "sourceConfig": {
+ "topic":
"persistent://tenant/namespace/0fc00000046",
+ "subscription": "sid_es_v3",
+ "encodingType": "UTF-8",
+ "deserializationConfig": {
+ "type": "INLONG_MSG",
+ "streamId": "0fc00000046"
+ },
+ "dataTypeConfig": {
+ "type": "CSV",
+ "delimiter": "|",
+ "escapeChar": "\\"
+ },
+ "fieldConfigs": [
+ {
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "var1",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ]
+ },
+ "sinkConfig": {
+ "type": "ELASTICSEARCH",
+ "encodingType": null,
+ "fieldConfigs": [
+ {
+ "name": "ftime",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "extinfo",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t1",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t2",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t3",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t4",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "indexNamePattern":
"inlong0fc00000046_{yyyyMMdd}",
+ "contentOffset": 0,
+ "fieldOffset": 2,
+ "separator": "|"
+ },
+ "properties": null
+ }
+ ]
+ }
+ ],
+ "nodeConfig": {
+ "type": "ELASTICSEARCH",
+ "version": 4,
+ "nodeName": "sid_es_es-rmrv7g7a_v3",
+ "bulkAction": 4000,
+ "bulkSizeMb": 10,
+ "flushInterval": 60,
+ "concurrentRequests": 5,
+ "maxConnect": 10,
+ "keywordMaxLength": 32767,
+ "isUseIndexId": false,
+ "maxThreads": 2,
+ "auditSetName": null,
+ "httpHosts": "127.0.0.1:9200",
+ "username": "elastic",
+ "token": "password",
+ "password": "password"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
index 10c51cd0fb..637698dfa7 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
@@ -14,72 +14,129 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
{
- "clusterName":"esv3-sz-sz1",
- "sortTasks":[
- {
- "idParams":[
- {
- "inlongGroupId":"0fc00000046",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong0fc00000046_{yyyyMMdd}",
- "fieldNames":"ftime extinfo t1
t2 t3 t4",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"03600000045",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong03600000045_{yyyyMMdd}",
- "fieldNames":"ftime extinfo t1
t2 t3",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"05100054990",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong05100054990_{yyyyMMdd}",
- "fieldNames":"ftime extinfo
field1 field2 field3 field4",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"09c00014434",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong09c00014434_{yyyyMMdd}",
- "fieldNames":"ftime extinfo id
elog containerName cid setName ip",
- "fieldOffset":2,
- "contentOffset":0
- },
- {
- "inlongGroupId":"0c900035509",
- "inlongStreamId":"",
- "separator":"|",
-
"indexNamePattern":"inlong0c900035509_{yyyyMMdd}",
- "fieldNames":"ftime extinfo
statTime msgTime senderIp containerName cid kafkaGroupId KafkaClusterId topic
partitionId count size lastMaxOffset minOffset maxOffset minAckOffset
maxAckOffset",
- "fieldOffset":2,
- "contentOffset":0
- }
- ],
- "name":"sid_es_es-rmrv7g7a_v3",
- "sinkParams":{
- "httpHosts":"127.0.0.1:9200",
- "username":"elastic",
- "password":"elastic",
- "bulkAction":4000,
- "bulkSizeMb":10,
- "flushInterval":60,
- "concurrentRequests":5,
- "maxConnect":10,
- "keywordMaxLength":32767,
- "isUseIndexId":false
- },
- "type":"ElasticSearch"
- }
- ]
- }
+ "sortClusterName": "esv3-gz-gz1",
+ "tasks": [
+ {
+ "sortTaskName": "sid_es_es-rmrv7g7a_v3",
+ "clusters": [
+ {
+ "clusterTag": "default_cluster",
+ "mqClusterConfigs": [
+ {
+ "type": "PULSAR",
+ "version": "9",
+ "clusterName": "xxxx",
+ "adminUrl": "xxx",
+ "serviceUrl": "xxx",
+ "token": ""
+ }
+ ],
+ "dataFlowConfigs": [
+ {
+ "dataflowId": "553",
+ "version": 0,
+ "auditTag": "553",
+ "inlongGroupId": "",
+ "inlongStreamId": "0fc00000046",
+ "sourceConfig": {
+ "topic":
"persistent://tenant/namespace/0fc00000046",
+ "subscription": "sid_es_v3",
+ "encodingType": "UTF-8",
+ "deserializationConfig": {
+ "type": "INLONG_MSG",
+ "streamId": "0fc00000046"
+ },
+ "dataTypeConfig": {
+ "type": "CSV",
+ "delimiter": "|",
+ "escapeChar": "\\"
+ },
+ "fieldConfigs": [
+ {
+ "name": "name",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "var1",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ]
+ },
+ "sinkConfig": {
+ "type": "ELASTICSEARCH",
+ "encodingType": null,
+ "fieldConfigs": [
+ {
+ "name": "ftime",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "extinfo",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t1",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t2",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t3",
+ "formatInfo": {
+ "type": "string"
+ }
+ },
+ {
+ "name": "t4",
+ "formatInfo": {
+ "type": "string"
+ }
+ }
+ ],
+ "indexNamePattern":
"inlong0fc00000046_{yyyyMMdd}",
+ "contentOffset": 0,
+ "fieldOffset": 2,
+ "separator": "|"
+ },
+ "properties": null
+ }
+ ]
+ }
+ ],
+ "nodeConfig": {
+ "type": "ELASTICSEARCH",
+ "version": 4,
+ "nodeName": "sid_es_es-rmrv7g7a_v3",
+ "bulkAction": 4000,
+ "bulkSizeMb": 10,
+ "flushInterval": 60,
+ "concurrentRequests": 5,
+ "maxConnect": 10,
+ "keywordMaxLength": 32767,
+ "isUseIndexId": false,
+ "maxThreads": 2,
+ "auditSetName": null,
+ "httpHosts": "127.0.0.1:9200",
+ "username": "elastic",
+ "token": "password",
+ "password": "password"
+ }
+ }
+ ]
+}
\ No newline at end of file