This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 292bbb7547 [INLONG-9060][Manager] Fix manager return wrong sink
configuration to sort standalone (#9067)
292bbb7547 is described below
commit 292bbb75473e7792a4e6ab920eeadfa2117d8398
Author: castor <[email protected]>
AuthorDate: Tue Oct 24 12:49:41 2023 +0800
[INLONG-9060][Manager] Fix manager return wrong sink configuration to sort
standalone (#9067)
---
.../mappers/StreamSinkFieldEntityMapper.xml | 1 +
.../pojo/sort/standalone/SortFieldInfo.java | 1 +
.../service/core/impl/SortClusterServiceImpl.java | 35 ++++++++++++--
.../service/resource/sink/cls/ClsOperator.java | 53 ++++++++++++----------
.../resource/sink/cls/ClsResourceOperator.java | 9 ++--
.../service/sink/pulsar/PulsarSinkOperator.java | 6 ++-
.../resources/application-unit-test.properties | 3 ++
.../src/main/resources/application-dev.properties | 3 ++
.../src/main/resources/application-prod.properties | 3 ++
.../src/main/resources/application-test.properties | 3 ++
.../src/main/resources/application.properties | 3 ++
11 files changed, 85 insertions(+), 35 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 8273c33343..2fae94247f 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -123,6 +123,7 @@
select
inlong_group_id,
inlong_stream_id,
+ sink_id,
field_name
from stream_sink_field
where is_deleted = 0
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
index b4fcbd9fd9..fc8ad99156 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
@@ -24,5 +24,6 @@ public class SortFieldInfo {
private String inlongGroupId;
private String inlongStreamId;
+ private Integer sinkId;
private String fieldName;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index 6af22b75b6..170b295249 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -20,11 +20,14 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
@@ -34,6 +37,7 @@ import
org.apache.inlong.manager.service.sink.StreamSinkOperator;
import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +80,9 @@ public class SortClusterServiceImpl implements
SortClusterService {
private static final String KEY_GROUP_ID = "inlongGroupId";
private static final String KEY_STREAM_ID = "inlongStreamId";
- private Map<String, List<String>> fieldMap;
+ private static final String FILED_OFFSET = "fieldOffset";
+ // key: sink id, value: fileNames
+ private Map<Integer, List<String>> fieldMap;
// key : sort cluster name, value : md5
private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
@@ -84,6 +90,8 @@ public class SortClusterServiceImpl implements
SortClusterService {
private Map<String, SortClusterConfig> sortClusterConfigMap = new
ConcurrentHashMap<>();
// key : sort cluster name, value : error log
private Map<String, String> sortClusterErrorLogMap = new
ConcurrentHashMap<>();
+ // key: group id ,value: {key: stream id, value: stream info}
+ private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
private long reloadInterval;
@@ -169,7 +177,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
fieldMap = new HashMap<>();
fieldInfos.forEach(info -> {
- List<String> fields =
fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>());
+ List<String> fields = fieldMap.computeIfAbsent(info.getSinkId(), k
-> new ArrayList<>());
fields.add(info.getFieldName());
});
@@ -183,6 +191,12 @@ public class SortClusterServiceImpl implements
SortClusterService {
&& StringUtils.isNotBlank(dto.getSinkType()))
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
+ // reload all streams
+ allStreams = sortConfigLoader.loadAllStreams()
+ .stream()
+
.collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
+
Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
+
// get all stream sinks
Map<String, List<StreamSinkEntity>> task2AllStreams =
sinkEntities.stream()
.filter(entity ->
StringUtils.isNotBlank(entity.getInlongClusterName()))
@@ -265,8 +279,10 @@ public class SortClusterServiceImpl implements
SortClusterService {
.map(streamSink -> {
try {
StreamSinkOperator operator =
sinkOperatorFactory.getInstance(streamSink.getSinkType());
- List<String> fields =
fieldMap.get(streamSink.getInlongGroupId());
- return operator.parse2IdParams(streamSink, fields,
dataNodeInfo);
+ List<String> fields = fieldMap.get(streamSink.getId());
+ Map<String, String> params =
operator.parse2IdParams(streamSink, fields, dataNodeInfo);
+ setFiledOffset(streamSink, params);
+ return params;
} catch (Exception e) {
LOGGER.error("fail to parse id params of groupId={},
streamId={} name={}, type={}}",
streamSink.getInlongGroupId(),
streamSink.getInlongStreamId(),
@@ -278,6 +294,17 @@ public class SortClusterServiceImpl implements
SortClusterService {
.collect(Collectors.toList());
}
+ private void setFiledOffset(StreamSinkEntity streamSink, Map<String,
String> params) {
+
+ SortSourceStreamInfo sortSourceStreamInfo =
allStreams.get(streamSink.getInlongGroupId())
+ .get(streamSink.getInlongStreamId());
+ InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(
+ sortSourceStreamInfo.getExtParams(),
InlongStreamExtParam.class);
+ if (ObjectUtils.anyNotNull(inlongStreamExtParam) &&
!inlongStreamExtParam.getUseExtendedFields()) {
+ params.put(FILED_OFFSET, String.valueOf(0));
+ }
+ }
+
private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator =
dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
index 9aa29d9993..d15223b861 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@@ -55,26 +56,28 @@ import java.util.List;
@Service
public class ClsOperator {
+ @Value("${cls.manager.endpoint}")
+ private String endpoint;
private static final Logger LOG =
LoggerFactory.getLogger(ClsOperator.class);
private static final String TOPIC_NAME = "topicName";
private static final String LOG_SET_ID = "logsetId";
private static final long PRECISE_SEARCH = 1L;
- public String createTopicReturnTopicId(String topicName, String logSetId,
String tag, String secretId,
- String secretKey, String endPoint, String region)
+ public String createTopicReturnTopicId(String topicName, String logSetId,
String tag, Integer storageDuration,
+ String secretId, String secretKey, String region)
throws TencentCloudSDKException {
- ClsClient client = getClsClient(secretId, secretKey, endPoint, region);
- CreateTopicRequest req = getCreateTopicRequest(tag, logSetId,
topicName);
+ ClsClient client = getClsClient(secretId, secretKey, region);
+ CreateTopicRequest req = getCreateTopicRequest(tag, logSetId,
topicName, storageDuration);
CreateTopicResponse resp = client.CreateTopic(req);
LOG.info("create cls topic success for topicName = {}, topicId = {},
requestId = {}", topicName,
resp.getTopicId(), resp.getRequestId());
- updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, endPoint,
region);
+ updateTopicTag(resp.getTopicId(), tag, secretId, secretKey, region);
return resp.getTopicId();
}
- public void updateTopicTag(String topicId, String tag, String secretId,
- String secretKey, String endPoint, String region) throws
TencentCloudSDKException {
- ClsClient client = getClsClient(secretId, secretKey, endPoint, region);
+ public void updateTopicTag(String topicId, String tag, String secretId,
String secretKey, String region)
+ throws TencentCloudSDKException {
+ ClsClient client = getClsClient(secretId, secretKey, region);
ModifyTopicRequest modifyTopicRequest = new ModifyTopicRequest();
modifyTopicRequest.setTags(convertTags(tag.split(InlongConstants.CENTER_LINE)));
modifyTopicRequest.setTopicId(topicId);
@@ -85,22 +88,22 @@ public class ClsOperator {
/**
* Create topic index by tokenizer
*/
- public void createTopicIndex(String tokenizer, String topicId, String
secretId, String secretKey, String endPoint,
- String region) throws BusinessException {
+ public void createTopicIndex(String tokenizer, String topicId, String
secretId, String secretKey, String region)
+ throws BusinessException {
LOG.debug("create topic index start for topicId = {}, tokenizer = {}",
topicId, tokenizer);
if (StringUtils.isBlank(tokenizer)) {
LOG.warn("tokenizer is blank for topic = {}", topicId);
return;
}
- FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId,
secretKey, endPoint, region, topicId);
+ FullTextInfo topicIndexFullText = getTopicIndexFullText(secretId,
secretKey, region, topicId);
if (ObjectUtils.anyNotNull(topicIndexFullText)) {
// if topic index exist, update
LOG.debug("cls topic is exist and update for topicId =
{},tokenizer = {}", topicId, tokenizer);
- updateTopicIndex(tokenizer, topicId, secretId, secretKey,
endPoint, region);
+ updateTopicIndex(tokenizer, topicId, secretId, secretKey, region);
return;
}
- ClsClient clsClient = getClsClient(secretId, secretKey, endPoint,
region);
+ ClsClient clsClient = getClsClient(secretId, secretKey, region);
CreateIndexRequest req = getCreateIndexRequest(tokenizer, topicId);
try {
CreateIndexResponse createIndexResponse =
clsClient.CreateIndex(req);
@@ -117,9 +120,9 @@ public class ClsOperator {
/**
* Describe cls topicId by topic name
*/
- public String describeTopicIDByTopicName(String topicName, String
logSetId, String tag, String secretId,
- String secretKey, String endPoint, String region) {
- ClsClient clsClient = getClsClient(secretId, secretKey, endPoint,
region);
+ public String describeTopicIDByTopicName(String topicName, String
logSetId, String secretId, String secretKey,
+ String region) {
+ ClsClient clsClient = getClsClient(secretId, secretKey, region);
Filter[] filters = getDescribeFilters(topicName, logSetId);
DescribeTopicsRequest req = new DescribeTopicsRequest();
req.setFilters(filters);
@@ -154,10 +157,9 @@ public class ClsOperator {
/**
* Get cls topic index full text
*/
- public FullTextInfo getTopicIndexFullText(String secretId, String
secretKey, String endPoint, String region,
- String topicId) {
+ public FullTextInfo getTopicIndexFullText(String secretId, String
secretKey, String region, String topicId) {
- ClsClient clsClient = getClsClient(secretId, secretKey, endPoint,
region);
+ ClsClient clsClient = getClsClient(secretId, secretKey, region);
DescribeIndexRequest req = new DescribeIndexRequest();
req.setTopicId(topicId);
try {
@@ -170,9 +172,8 @@ public class ClsOperator {
}
}
- public void updateTopicIndex(String tokenizer, String topicId,
- String secretId, String secretKey, String endPoint, String region)
{
- ClsClient clsClient = getClsClient(secretId, secretKey, endPoint,
region);
+ public void updateTopicIndex(String tokenizer, String topicId, String
secretId, String secretKey, String region) {
+ ClsClient clsClient = getClsClient(secretId, secretKey, region);
RuleInfo ruleInfo = new RuleInfo();
FullTextInfo fullTextInfo = new FullTextInfo();
fullTextInfo.setTokenizer(tokenizer);
@@ -192,11 +193,11 @@ public class ClsOperator {
}
}
- public ClsClient getClsClient(String secretId, String secretKey, String
endPoint, String region) {
+ public ClsClient getClsClient(String secretId, String secretKey, String
region) {
Credential cred = new Credential(secretId,
secretKey);
HttpProfile httpProfile = new HttpProfile();
- httpProfile.setEndpoint(endPoint);
+ httpProfile.setEndpoint(endpoint);
ClientProfile clientProfile = new ClientProfile();
clientProfile.setHttpProfile(httpProfile);
@@ -215,11 +216,13 @@ public class ClsOperator {
return req;
}
- public CreateTopicRequest getCreateTopicRequest(String tags, String
logSetId, String topicName) {
+ public CreateTopicRequest getCreateTopicRequest(String tags, String
logSetId, String topicName,
+ Integer storageDuration) {
CreateTopicRequest req = new CreateTopicRequest();
req.setTags(convertTags(tags.split(InlongConstants.CENTER_LINE)));
req.setLogsetId(logSetId);
req.setTopicName(topicName);
+ req.setPeriod(storageDuration == null ? null :
Long.valueOf(storageDuration));
return req;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
index c16391907a..173a139758 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -86,7 +86,7 @@ public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator
// create topic index by tokenizer
clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(),
clsSinkDTO.getTopicId(),
clsDataNode.getManageSecretId(),
- clsDataNode.getManageSecretKey(),
clsDataNode.getEndpoint(), clsDataNode.getRegion());
+ clsDataNode.getManageSecretKey(), clsDataNode.getRegion());
// update set topic id into sink info
updateSinkInfo(sinkInfo, clsSinkDTO);
String info = "success to create cls resource";
@@ -104,14 +104,13 @@ public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator
private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO
clsSinkDTO)
throws TencentCloudSDKException {
String topicId =
clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(),
clsDataNode.getLogSetId(),
- clsSinkDTO.getTag(),
- clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(), clsDataNode.getEndpoint(),
+ clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
if (StringUtils.isBlank(topicId)) {
// if topic don't exist, create topic in cls
topicId =
clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(),
clsDataNode.getLogSetId(),
- clsSinkDTO.getTag(), clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
- clsDataNode.getEndpoint(),
+ clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(),
clsDataNode.getManageSecretId(),
+ clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
}
return topicId;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
index 6a1657b1ec..dcad49ae90 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/pulsar/PulsarSinkOperator.java
@@ -46,6 +46,8 @@ import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.PULSAR_TOPIC_FORMAT;
+
/**
* Pulsar sink operator
*/
@@ -123,7 +125,9 @@ public class PulsarSinkOperator extends
AbstractSinkOperator {
}
private String getFullTopicName(PulsarSinkDTO pulsarSinkDTO) {
- return pulsarSinkDTO.getPulsarTenant() + "/" +
pulsarSinkDTO.getNamespace() + "/" + pulsarSinkDTO.getTopic();
+ return String.format(PULSAR_TOPIC_FORMAT,
pulsarSinkDTO.getPulsarTenant(), pulsarSinkDTO.getNamespace(),
+ pulsarSinkDTO.getTopic());
+
}
}
diff --git
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
index 862eba4706..aa9155e735 100644
---
a/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
+++
b/inlong-manager/manager-test/src/main/resources/application-unit-test.properties
@@ -67,3 +67,6 @@ common.http-client.validateAfterInactivity=5000
common.http-client.connectionTimeout=3000
common.http-client.readTimeout=10000
common.http-client.connectionRequestTimeout=3000
+
+# tencent cloud log service endpoint, The Operator cls resource by it
+cls.manager.endpoint=127.0.0.1
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 376d9b9f73..0bcebdc06e 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -109,3 +109,6 @@ group.deleted.batchSize=100
group.deleted.enabled=false
metrics.audit.proxy.hosts=127.0.0.1:10081
+
+# tencent cloud log service endpoint, The Operator cls resource by it
+cls.manager.endpoint=127.0.0.1
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index c47fc92334..a9c55b39b3 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -108,3 +108,6 @@ group.deleted.batchSize=100
group.deleted.enabled=false
metrics.audit.proxy.hosts=127.0.0.1:10081
+
+# tencent cloud log service endpoint, The Operator cls resource by it
+cls.manager.endpoint=127.0.0.1
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 376d9b9f73..0bcebdc06e 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -109,3 +109,6 @@ group.deleted.batchSize=100
group.deleted.enabled=false
metrics.audit.proxy.hosts=127.0.0.1:10081
+
+# tencent cloud log service endpoint, The Operator cls resource by it
+cls.manager.endpoint=127.0.0.1
diff --git
a/inlong-manager/manager-web/src/main/resources/application.properties
b/inlong-manager/manager-web/src/main/resources/application.properties
index c434645b2f..6b56dfb3d9 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -63,3 +63,6 @@ openapi.auth.enabled=false
# Audit view by role, see audit id definitions:
https://inlong.apache.org/docs/modules/audit/overview#audit-id
audit.admin.ids=3,4,5,6
audit.user.ids=3,4,5,6
+
+# tencent cloud log service endpoint, The Operator cls resource by it
+cls.manager.endpoint=127.0.0.1