This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d7aad5440e [INLONG-11790][Manager] CLS uses topic id as the unique key
(#11791)
d7aad5440e is described below
commit d7aad5440ee7d3e19cacbec85462089cc73737e0
Author: fuweng11 <[email protected]>
AuthorDate: Mon Mar 3 11:55:25 2025 +0800
[INLONG-11790][Manager] CLS uses topic id as the unique key (#11791)
---
.../inlong/manager/common/consts/SourceType.java | 1 +
.../service/resource/sink/cls/ClsOperator.java | 38 +++++++++++++++-------
.../resource/sink/cls/ClsResourceOperator.java | 26 ++++++++++-----
.../manager/service/sink/cls/ClsSinkOperator.java | 15 +++++++++
4 files changed, 60 insertions(+), 20 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 263950297d..26cd94f778 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -49,6 +49,7 @@ public class SourceType extends StreamType {
put(FILE, TaskTypeEnum.FILE);
put(COS, TaskTypeEnum.COS);
+ put(SQL, TaskTypeEnum.SQL);
put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
put(POSTGRESQL, TaskTypeEnum.POSTGRES);
put(ORACLE, TaskTypeEnum.ORACLE);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
index f5162d1cc0..a89adf5d39 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsOperator.java
@@ -58,7 +58,7 @@ public class ClsOperator {
@Value("${cls.manager.endpoint}")
private String endpoint;
private static final Logger LOG =
LoggerFactory.getLogger(ClsOperator.class);
- private static final String TOPIC_NAME = "topicName";
+ private static final String TOPIC_ID = "topicId";
private static final String LOG_SET_ID = "logsetId";
private static final long PRECISE_SEARCH = 1L;
@@ -117,12 +117,12 @@ public class ClsOperator {
}
/**
- * Describe cls topicId by topic name
+ * Describe cls topicName by topic id
*/
- public String describeTopicIDByTopicName(String topicName, String
logSetId, String secretId, String secretKey,
+ public String describeTopicNameByTopicId(String topicId, String logSetId,
String secretId, String secretKey,
String region) {
ClsClient clsClient = getClsClient(secretId, secretKey, region);
- Filter[] filters = getDescribeFilters(topicName, logSetId);
+ Filter[] filters = getDescribeFilters(topicId, logSetId);
DescribeTopicsRequest req = new DescribeTopicsRequest();
req.setFilters(filters);
req.setPreciseSearch(PRECISE_SEARCH);
@@ -130,7 +130,7 @@ public class ClsOperator {
DescribeTopicsResponse describeTopicsResponse =
clsClient.DescribeTopics(req);
if (ArrayUtils.isNotEmpty(describeTopicsResponse.getTopics())) {
TopicInfo[] topics = describeTopicsResponse.getTopics();
- return topics[0].getTopicId();
+ return topics[0].getTopicName();
}
return null;
} catch (Exception e) {
@@ -140,17 +140,33 @@ public class ClsOperator {
}
}
- public Filter[] getDescribeFilters(String topicName, String logSetId) {
- Filter topicNameFilter = new Filter();
- topicNameFilter.setKey(TOPIC_NAME);
- String[] topicNameFilterValues = new String[]{topicName};
- topicNameFilter.setValues(topicNameFilterValues);
+ public void modifyTopicNameByTopicId(String topicId, String topicName,
String secretId, String secretKey,
+ String region) {
+ ClsClient clsClient = getClsClient(secretId, secretKey, region);
+ ModifyTopicRequest req = new ModifyTopicRequest();
+ req.setTopicId(topicId);
+ req.setTopicName(topicName);
+ try {
+ ModifyTopicResponse modifyTopicResponse =
clsClient.ModifyTopic(req);
+ LOG.info("modify cls topic name success for topicId={},
topicName={}", topicId, topicName);
+ } catch (Exception e) {
+ String errMsg = "modify cls topic name failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
+ public Filter[] getDescribeFilters(String topicId, String logSetId) {
+ Filter topicIdFilter = new Filter();
+ topicIdFilter.setKey(TOPIC_ID);
+ String[] topicIdFilterValues = new String[]{topicId};
+ topicIdFilter.setValues(topicIdFilterValues);
Filter logSetIdFilter = new Filter();
logSetIdFilter.setKey(LOG_SET_ID);
String[] logSetFilterValues = new String[]{logSetId};
logSetIdFilter.setValues(logSetFilterValues);
- return new Filter[]{topicNameFilter, logSetIdFilter};
+ return new Filter[]{topicIdFilter, logSetIdFilter};
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
index ced68be0ad..8148acdcb9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.Objects;
+
@Service
public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator {
@@ -80,8 +82,7 @@ public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator
ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(),
ClsSinkDTO.class);
try {
- String topicId = getTopicID(clsDataNode, clsSinkDTO);
- clsSinkDTO.setTopicId(topicId);
+ createOrUpdateTopicName(clsDataNode, clsSinkDTO);
sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO));
// create topic index by tokenizer
clsOperator.createTopicIndex(clsSinkDTO.getTokenizer(),
clsSinkDTO.getTopicId(),
@@ -101,19 +102,26 @@ public class ClsResourceOperator extends
AbstractStandaloneSinkResourceOperator
}
}
- private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO
clsSinkDTO)
+ private void createOrUpdateTopicName(ClsDataNodeDTO clsDataNode,
ClsSinkDTO clsSinkDTO)
throws Exception {
- String topicId =
clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(),
clsDataNode.getLogSetId(),
- clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
- clsDataNode.getRegion());
- if (StringUtils.isBlank(topicId)) {
+ String topicName = clsSinkDTO.getTopicName();
+ if (StringUtils.isBlank(clsSinkDTO.getTopicId())) {
// if topic don't exist, create topic in cls
- topicId =
clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(),
clsDataNode.getLogSetId(),
+ String topicId =
clsOperator.createTopicReturnTopicId(clsSinkDTO.getTopicName(),
clsDataNode.getLogSetId(),
clsSinkDTO.getTag(), clsSinkDTO.getStorageDuration(),
clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
+ clsSinkDTO.setTopicId(topicId);
+ } else {
+ topicName =
clsOperator.describeTopicNameByTopicId(clsSinkDTO.getTopicId(),
clsDataNode.getLogSetId(),
+ clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
+ clsDataNode.getRegion());
+ if (!Objects.equals(topicName, clsSinkDTO.getTopicName())) {
+ clsOperator.modifyTopicNameByTopicId(clsSinkDTO.getTopicId(),
clsSinkDTO.getTopicName(),
+ clsDataNode.getManageSecretId(),
clsDataNode.getManageSecretKey(),
+ clsDataNode.getRegion());
+ }
}
- return topicId;
}
private void updateSinkInfo(SinkInfo sinkInfo, ClsSinkDTO clsSinkDTO) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index ab6c54fa51..89b580229c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -45,9 +45,11 @@ import
org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.resource.sink.cls.ClsOperator;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -74,6 +76,8 @@ public class ClsSinkOperator extends AbstractSinkOperator {
private ObjectMapper objectMapper;
@Autowired
private DataNodeEntityMapper dataNodeEntityMapper;
+ @Autowired
+ private ClsOperator clsOperator;
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
@@ -125,6 +129,17 @@ public class ClsSinkOperator extends AbstractSinkOperator {
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
CommonBeanUtils.copyProperties(clsDataNodeDTO, sink, true);
+ if (StringUtils.isNotBlank(sink.getTopicId())) {
+ try {
+ String topicName =
+
clsOperator.describeTopicNameByTopicId(sink.getTopicId(),
clsDataNodeDTO.getLogSetId(),
+ clsDataNodeDTO.getManageSecretId(),
clsDataNodeDTO.getManageSecretKey(),
+ clsDataNodeDTO.getRegion());
+ sink.setTopicName(topicName);
+ } catch (Exception e) {
+ LOGGER.error("get cls topic name failed for sinId={},
topicId={}", sink.getId(), sink.getTopicId(), e);
+ }
+ }
List<SinkField> sinkFields = getSinkFields(entity.getId());
sink.setSinkFieldList(sinkFields);
return sink;