This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 25e03cedea [INLONG-8845][Manager] Support Tencent Cloud Log Service 
data flow (#8892)
25e03cedea is described below

commit 25e03cedea49939072fde3bfb3703ca4ab085cfc
Author: castor <[email protected]>
AuthorDate: Sun Sep 17 22:01:01 2023 +0800

    [INLONG-8845][Manager] Support Tencent Cloud Log Service data flow (#8892)
---
 .../inlong/manager/common/consts/DataNodeType.java |   5 +
 .../manager/common/consts/InlongConstants.java     |   2 +
 .../inlong/manager/common/consts/SinkType.java     |   5 +
 .../pojo/group/pulsar/InlongPulsarRequest.java     |   2 +-
 .../pojo/group/pulsar/InlongPulsarTopicInfo.java   |   2 +-
 .../manager/pojo/node/cls/ClsDataNodeDTO.java      |  94 +++++++++++
 .../manager/pojo/node/cls/ClsDataNodeInfo.java     |  81 +++++++++
 .../manager/pojo/node/cls/ClsDataNodeRequest.java  |  71 ++++++++
 .../inlong/manager/pojo/sink/cls/ClsSink.java      |  90 ++++++++++
 .../inlong/manager/pojo/sink/cls/ClsSinkDTO.java   |  76 +++++++++
 .../cls/ClsSinkRequest.java}                       |  41 +++--
 inlong-manager/manager-service/pom.xml             |   5 +-
 .../service/group/InlongGroupOperator4Pulsar.java  |   2 +-
 .../service/node/cls/ClsDataNodeOperator.java      | 111 +++++++++++++
 .../resource/sink/cls/ClsResourceOperator.java     | 183 +++++++++++++++++++++
 .../sink/pulsar/PulsarResourceOperator.java        |   6 +
 .../manager/service/sink/cls/ClsSinkOperator.java  | 131 +++++++++++++++
 licenses/inlong-manager/LICENSE                    |   1 +
 pom.xml                                            |   6 +
 19 files changed, 888 insertions(+), 26 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 6e1002612c..e14a98195f 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -36,5 +36,10 @@ public class DataNodeType {
     public static final String ORACLE = "ORACLE";
     public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
+    /**
+     * Tencent cloud log service
+     * Details: <a href="https://www.tencentcloud.com/products/cls";>CLS</a>
+     */
+    public static final String CLS = "CLS";
     public static final String PULSAR = "PULSAR";
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 3683280c6d..7772ef4ba0 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -60,6 +60,8 @@ public class InlongConstants {
 
     public static final String UNDERSCORE = "_";
 
+    public static final String CENTER_LINE = "\\|";
+
     public static final String LEFT_BRACKET = "(";
 
     public static final String PERCENT = "%";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index f11e9daf6d..965e809f51 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -34,4 +34,9 @@ public class SinkType extends StreamType {
     public static final String STARROCKS = "STARROCKS";
     public static final String KUDU = "KUDU";
     public static final String REDIS = "REDIS";
+    /**
+     * Tencent cloud log service
+     * Details: <a href="https://www.tencentcloud.com/products/cls";>CLS</a>
+     */
+    public static final String CLS = "CLS";
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
index 443a5af472..9d93e39ee5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
@@ -42,7 +42,7 @@ public class InlongPulsarRequest extends InlongGroupRequest {
      *  from {@link 
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest}
      */
     @ApiModelProperty(value = "Pulsar tenant")
-    private String tenant;
+    private String pulsarTenant;
 
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, 
high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
index c1a955b339..835557fcce 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
@@ -39,7 +39,7 @@ import java.util.List;
 public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
 
     @ApiModelProperty(value = "Pulsar tenant")
-    private String tenant;
+    private String pulsarTenant;
 
     @ApiModelProperty(value = "Pulsar namespace")
     private String namespace;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
new file mode 100644
index 0000000000..dc0d29926b
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Cloud log service data node info")
+public class ClsDataNodeDTO {
+
+    @ApiModelProperty("Cloud log service master account")
+    private String mainAccountId;
+
+    @ApiModelProperty("Cloud log service sub account")
+    private String subAccountId;
+
+    @ApiModelProperty("Cloud log service send api secretKey")
+    private String sendSecretKey;
+
+    @ApiModelProperty("Cloud log service send api secretId")
+    private String sendSecretId;
+
+    @ApiModelProperty("Cloud log service manage api secretKey")
+    private String manageSecretKey;
+
+    @ApiModelProperty("Cloud log service manage api secretId")
+    private String manageSecretId;
+
+    @ApiModelProperty("Cloud log service endpoint")
+    private String endpoint;
+
+    @ApiModelProperty("Cloud log service region")
+    private String region;
+
+    @ApiModelProperty("Cloud log service log set id")
+    private String logSetId;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ClsDataNodeDTO getFromRequest(ClsDataNodeRequest request, 
String extParams) {
+        ClsDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? ClsDataNodeDTO.getFromJson(extParams)
+                : new ClsDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static ClsDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, ClsDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for Cloud log 
service node: %s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
new file mode 100644
index 0000000000..083dc43558
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.ELASTICSEARCH)
+@ApiModel("Cloud log service data node info")
+public class ClsDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Cloud log service master account")
+    private String mainAccountId;
+
+    @ApiModelProperty("Cloud log service sub account")
+    private String subAccountId;
+
+    @ApiModelProperty("Cloud log service send api secretKey")
+    private String sendSecretKey;
+
+    @ApiModelProperty("Cloud log service send api secretId")
+    private String sendSecretId;
+
+    @ApiModelProperty("Cloud log service manage api secretKey")
+    private String manageSecretKey;
+
+    @ApiModelProperty("Cloud log service manage api secretId")
+    private String manageSecretId;
+
+    @ApiModelProperty("Cloud log service endpoint")
+    private String endpoint;
+
+    @ApiModelProperty("Cloud log service region")
+    private String region;
+
+    @ApiModelProperty("Cloud log service log set id")
+    private String logSetId;
+
+    public ClsDataNodeInfo() {
+        setType(DataNodeType.CLS);
+    }
+
+    @Override
+    public DataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, ClsDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
new file mode 100644
index 0000000000..7db2392cc9
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Cloud log service data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.CLS)
+@ApiModel("Cloud log service data node request")
+public class ClsDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Cloud log service master account")
+    private String mainAccountId;
+
+    @ApiModelProperty("Cloud log service sub account")
+    private String subAccountId;
+
+    @ApiModelProperty("Cloud log service send api secretKey")
+    private String sendSecretKey;
+
+    @ApiModelProperty("Cloud log service send api secretId")
+    private String sendSecretId;
+
+    @ApiModelProperty("Cloud log service manage api secretKey")
+    private String manageSecretKey;
+
+    @ApiModelProperty("Cloud log service manage api secretId")
+    private String manageSecretId;
+
+    @ApiModelProperty("Cloud log service endpoint")
+    private String endpoint;
+
+    @ApiModelProperty("Cloud log service region")
+    private String region;
+
+    @ApiModelProperty("Cloud log service log set id")
+    private String logSetId;
+
+    public ClsDataNodeRequest() {
+        this.setType(DataNodeType.CLS);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
new file mode 100644
index 0000000000..5ab92728fd
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.cls;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Cloud log service sink info")
+@JsonTypeDefine(value = SinkType.CLS)
+public class ClsSink extends StreamSink {
+
+    @ApiModelProperty("Cloud log service topic id")
+    private String topicId;
+
+    @ApiModelProperty("Cloud log service topic name")
+    private String topicName;
+
+    @ApiModelProperty("Cloud log service api send secretKey")
+    private String sendSecretKey;
+
+    @ApiModelProperty("Cloud log service api send secretId")
+    private String sendSecretId;
+
+    @ApiModelProperty("Cloud log service api manage secretKey")
+    private String manageSecretKey;
+
+    @ApiModelProperty("Cloud log service api manage secretId")
+    private String manageSecretId;
+
+    @ApiModelProperty("Cloud log service endpoint")
+    private String endpoint;
+
+    @ApiModelProperty("Cloud log service log set id")
+    private String logSetId;
+
+    @ApiModelProperty("Cloud log service tag name")
+    private String tag;
+
+    @ApiModelProperty("Cloud log service master account")
+    private String mainAccountId;
+
+    @ApiModelProperty("Cloud log service subAccount")
+    private String subAccountId;
+
+    @ApiModelProperty("Cloud log service index tokenizer")
+    private String tokenizer;
+
+    public ClsSink() {
+        this.setSinkType(SinkType.CLS);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, ClsSinkRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
new file mode 100644
index 0000000000..3acd1e0372
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.cls;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sink info of Cloud log service
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClsSinkDTO {
+
+    @ApiModelProperty("Cloud log service topic id")
+    private String topicId;
+
+    @ApiModelProperty("Cloud log service topic name")
+    private String topicName;
+
+    @ApiModelProperty("Cloud log service topic save time")
+    private Integer saveTime;
+
+    @ApiModelProperty("Cloud log service tag name")
+    private String tag;
+
+    @ApiModelProperty("Cloud log service index tokenizer")
+    private String tokenizer;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ClsSinkDTO getFromRequest(ClsSinkRequest request, String 
extParams) {
+        ClsSinkDTO dto = StringUtils.isNotBlank(extParams)
+                ? ClsSinkDTO.getFromJson(extParams)
+                : new ClsSinkDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    public static ClsSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, ClsSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of CLS SinkDTO failure: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
similarity index 58%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
index c1a955b339..d962b3e9ac 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
@@ -15,40 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.pulsar;
+package org.apache.inlong.manager.pojo.sink.cls;
 
-import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
-import java.util.List;
-
+/**
+ * Cloud log service sink request.
+ */
 @Data
-@Builder
-@AllArgsConstructor
+@ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = MQType.PULSAR)
-@ApiModel("Inlong pulsar group topic info")
-public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
-
-    @ApiModelProperty(value = "Pulsar tenant")
-    private String tenant;
+@ApiModel(value = "Cloud log service sink request")
+@JsonTypeDefine(value = SinkType.CLS)
+public class ClsSinkRequest extends SinkRequest {
 
-    @ApiModelProperty(value = "Pulsar namespace")
-    private String namespace;
+    @ApiModelProperty("Cloud log service topic name")
+    private String topicName;
 
-    @ApiModelProperty(value = "Pulsar topics")
-    private List<String> topics;
+    @ApiModelProperty("Cloud log service topic save time")
+    private Integer saveTime;
 
-    public InlongPulsarTopicInfo() {
-        this.setMqType(MQType.PULSAR);
-    }
+    @ApiModelProperty("Cloud log service tag name")
+    private String tag;
 
+    @ApiModelProperty("Cloud log service index tokenizer")
+    private String tokenizer;
 }
diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index 0b1c499601..fe32b68874 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -152,7 +152,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
+        <dependency>
+            <groupId>com.tencentcloudapi</groupId>
+            <artifactId>tencentcloud-sdk-java-cls</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-jdbc</artifactId>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index e39561a443..29eb364e9f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -129,7 +129,7 @@ public class InlongGroupOperator4Pulsar extends 
AbstractGroupOperator {
             tenant = pulsarCluster.getPulsarTenant();
         }
         InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
-        topicInfo.setTenant(tenant);
+        topicInfo.setPulsarTenant(tenant);
         topicInfo.setNamespace(groupInfo.getMqResource());
         // each inlong stream is associated with a Pulsar topic
         List<String> topics = 
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
new file mode 100644
index 0000000000..40ec26f210
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.tencentcloudapi.cls.v20201016.ClsClient;
+import com.tencentcloudapi.cls.v20201016.models.DescribeTopicsRequest;
+import com.tencentcloudapi.common.Credential;
+import com.tencentcloudapi.common.exception.TencentCloudSDKException;
+import com.tencentcloudapi.common.profile.ClientProfile;
+import com.tencentcloudapi.common.profile.HttpProfile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClsDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClsDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        ClsDataNodeRequest clsDataNodeRequest = (ClsDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(clsDataNodeRequest, targetEntity, true);
+        try {
+            ClsDataNodeDTO dto = 
ClsDataNodeDTO.getFromRequest(clsDataNodeRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for Cloud log 
service node: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return DataNodeType.CLS.equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.CLS;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+        ClsDataNodeInfo info = new ClsDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, info);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            ClsDataNodeDTO dto = 
ClsDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, info);
+        }
+        return info;
+    }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        ClsDataNodeRequest dataNodeRequest = (ClsDataNodeRequest) request;
+        Credential cred = new Credential(dataNodeRequest.getManageSecretId(), 
dataNodeRequest.getManageSecretId());
+        HttpProfile httpProfile = new HttpProfile();
+        httpProfile.setEndpoint(dataNodeRequest.getEndpoint());
+        ClientProfile clientProfile = new ClientProfile();
+        clientProfile.setHttpProfile(httpProfile);
+        ClsClient client = new ClsClient(cred, dataNodeRequest.getRegion(), 
clientProfile);
+        DescribeTopicsRequest req = new DescribeTopicsRequest();
+        try {
+            client.DescribeTopics(req);
+        } catch (TencentCloudSDKException e) {
+            String errMsg = String.format("connect tencent cloud error 
endPoint = %s secretId = %s secretKey = %s",
+                    dataNodeRequest.getEndpoint(), 
dataNodeRequest.getManageSecretId(),
+                    dataNodeRequest.getManageSecretKey());
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+        return true;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
new file mode 100644
index 0000000000..e69f298ffe
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+
+import com.tencentcloudapi.cls.v20201016.ClsClient;
+import com.tencentcloudapi.cls.v20201016.models.CreateIndexRequest;
+import com.tencentcloudapi.cls.v20201016.models.CreateTopicRequest;
+import com.tencentcloudapi.cls.v20201016.models.CreateTopicResponse;
+import com.tencentcloudapi.cls.v20201016.models.FullTextInfo;
+import com.tencentcloudapi.cls.v20201016.models.RuleInfo;
+import com.tencentcloudapi.cls.v20201016.models.Tag;
+import com.tencentcloudapi.common.Credential;
+import com.tencentcloudapi.common.exception.TencentCloudSDKException;
+import com.tencentcloudapi.common.profile.ClientProfile;
+import com.tencentcloudapi.common.profile.HttpProfile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClsResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClsResourceOperator.class);
+
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private StreamSinkEntityMapper streamSinkEntityMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.CLS.equals(sinkType);
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+        if 
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("sink resource [" + sinkInfo.getId() + "] already 
success, skip to create");
+            return;
+        } else if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
 {
+            LOG.warn("create resource was disabled, skip to create for [" + 
sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTopicID(sinkInfo);
+    }
+
+    /**
+     * Create cloud log service topic
+     */
+    private void createTopicID(SinkInfo sinkInfo) {
+        ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
+        ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), 
ClsSinkDTO.class);
+        try {
+            ClsClient client = getClsClient(clsDataNode);
+            CreateTopicRequest req = getCreateTopicRequest(clsDataNode, 
clsSinkDTO);
+            CreateTopicResponse resp = client.CreateTopic(req);
+            LOG.info("create cls topic {} success ,topicId {}", 
clsSinkDTO.getTopicName(), resp.getTopicId());
+            // update set topic id into sink info
+            clsSinkDTO.setTopicId(resp.getTopicId());
+            sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO));
+            // create topic index by tokenizer
+            this.createTopicIndex(sinkInfo);
+            StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
+            CommonBeanUtils.copyProperties(sinkInfo, streamSinkEntity, true);
+            streamSinkEntityMapper.updateByIdSelective(streamSinkEntity);
+            String info = "success to create cls resource";
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info("update cls sink = {}info status  success ,topicName {}", 
streamSinkEntity.getSinkName(),
+                    clsSinkDTO.getTopicName());
+        } catch (TencentCloudSDKException e) {
+            String errMsg = "Create cls topic  failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new BusinessException(errMsg);
+        }
+    }
+
+    private CreateTopicRequest getCreateTopicRequest(ClsDataNodeDTO 
clsDataNode, ClsSinkDTO clsSinkDTO) {
+        CreateTopicRequest req = new CreateTopicRequest();
+        String[] allTags = 
clsSinkDTO.getTag().split(InlongConstants.CENTER_LINE);
+        req.setTags(convertTags(allTags));
+        req.setLogsetId(clsDataNode.getLogSetId());
+        req.setTopicName(clsSinkDTO.getTopicName());
+        return req;
+    }
+
+    private ClsClient getClsClient(ClsDataNodeDTO clsDataNode) {
+        Credential cred = new Credential(clsDataNode.getManageSecretId(),
+                clsDataNode.getManageSecretId());
+        HttpProfile httpProfile = new HttpProfile();
+        httpProfile.setEndpoint(clsDataNode.getEndpoint());
+        ClientProfile clientProfile = new ClientProfile();
+
+        clientProfile.setHttpProfile(httpProfile);
+        return new ClsClient(cred, clsDataNode.getRegion(), clientProfile);
+    }
+
+    /**
+     * Create topic index by tokenizer
+     */
+    private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException {
+        ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(), 
ClsSinkDTO.class);
+        if (StringUtils.isNotBlank(clsSinkDTO.getTokenizer())) {
+            LOG.warn("topic {} tokenizer is empty", clsSinkDTO.getTopicName());
+            return;
+        }
+        ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
+        ClsClient clsClient = getClsClient(clsDataNode);
+        RuleInfo ruleInfo = new RuleInfo();
+        FullTextInfo fullTextInfo = new FullTextInfo();
+        fullTextInfo.setTokenizer(clsSinkDTO.getTokenizer());
+        ruleInfo.setFullText(fullTextInfo);
+
+        CreateIndexRequest req = new CreateIndexRequest();
+        req.setTopicId(clsSinkDTO.getTopicId());
+        req.setRule(ruleInfo);
+        try {
+            clsClient.CreateIndex(req);
+        } catch (TencentCloudSDKException e) {
+            String errMsg = "Create cls topic index failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new BusinessException(errMsg);
+        }
+        LOG.info("topic {} create index success tokenizer is {}", 
clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer());
+    }
+
+    private Tag[] convertTags(String[] allTags) {
+        Tag[] tags = new Tag[allTags.length];
+        for (int i = 0; i < allTags.length; i++) {
+            String tag = allTags[i];
+            String[] keyAndValueOfTag = tag.split(InlongConstants.COLON);
+            Tag tagInfo = new Tag();
+            tagInfo.set(keyAndValueOfTag[0], keyAndValueOfTag[1]);
+            tags[i] = tagInfo;
+        }
+        return tags;
+    }
+
+    private ClsDataNodeDTO getClsDataNode(SinkInfo sinkInfo) {
+        DataNodeEntity dataNodeEntity = 
dataNodeEntityMapper.selectByUniqueKey(sinkInfo.getDataNodeName(),
+                DataNodeType.CLS);
+        return JsonUtils.parseObject(dataNodeEntity.getExtParams(), 
ClsDataNodeDTO.class);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index 8f5935f8e4..0d6df611ea 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -56,6 +57,8 @@ public class PulsarResourceOperator implements 
SinkResourceOperator {
     private DataNodeOperateHelper dataNodeHelper;
     @Autowired
     private PulsarOperator pulsarOperator;
+    @Autowired
+    private StreamSinkService sinkService;
 
     @Override
     public Boolean accept(String sinkType) {
@@ -99,6 +102,9 @@ public class PulsarResourceOperator implements 
SinkResourceOperator {
                     .build();
             // create topic
             pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+            final String info = "success to create Pulsar resource";
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info(info + " for sinkInfo={}", sinkInfo);
         } catch (PulsarClientException | PulsarAdminException e) {
             LOG.error("init pulsar admin error", e);
             throw new BusinessException();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
new file mode 100644
index 0000000000..ad8f4c69c1
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSink;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Cloud log service sink operator
+ */
+@Service
+public class ClsSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClsSinkOperator.class);
+    private static final String KEY_FIELDS = "fieldNames";
+    private static final String SECRET_KEY = "secretKey";
+    private static final String SECRET_ID = "secretId";
+    private static final String END_POINT = "endpoint";
+    private static final String TOPIC_ID = "topicId";
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private DataNodeEntityMapper dataNodeEntityMapper;
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
+        ClsSinkRequest sinkRequest = (ClsSinkRequest) request;
+        try {
+            ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Doris SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.CLS;
+    }
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.CLS.equals(sinkType);
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        ClsSink sink = new ClsSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        ClsSinkDTO dto = ClsSinkDTO.getFromJson(entity.getExtParams());
+        DataNodeEntity dataNodeEntity = 
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+                DataNodeType.CLS);
+        ClsDataNodeDTO clsDataNodeDTO = 
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+                ClsDataNodeDTO.class);
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        CommonBeanUtils.copyProperties(clsDataNodeDTO, sink, true);
+        List<SinkField> sinkFields = getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, 
List<String> fields) {
+        Map<String, String> params = super.parse2IdParams(streamSink, fields);
+        ClsSinkDTO clsSinkDTO = 
JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class);
+        params.put(TOPIC_ID, clsSinkDTO.getTopicId());
+        DataNodeEntity dataNodeEntity = 
dataNodeEntityMapper.selectByUniqueKey(streamSink.getDataNodeName(),
+                DataNodeType.CLS);
+        ClsDataNodeDTO clsDataNodeDTO = 
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+                ClsDataNodeDTO.class);
+        params.put(SECRET_ID, clsDataNodeDTO.getSendSecretId());
+        params.put(SECRET_KEY, clsDataNodeDTO.getSendSecretKey());
+        params.put(END_POINT, clsDataNodeDTO.getEndpoint());
+        StringBuilder fieldNames = new StringBuilder();
+        for (String field : fields) {
+            fieldNames.append(field).append(InlongConstants.BLANK);
+        }
+        params.put(KEY_FIELDS, fieldNames.toString());
+        return params;
+    }
+}
diff --git a/licenses/inlong-manager/LICENSE b/licenses/inlong-manager/LICENSE
index d14e3fd5b4..2643cf219a 100644
--- a/licenses/inlong-manager/LICENSE
+++ b/licenses/inlong-manager/LICENSE
@@ -681,6 +681,7 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
   org.springframework:spring-webmvc:5.3.18 - Spring Web MVC 
(https://github.com/spring-projects/spring-framework), (Apache License, Version 
2.0)
   com.typesafe:ssl-config-core_2.11:0.3.7 - ssl-config-core 
(https://github.com/lightbend/ssl-config/tree/v0.3.7), (Apache-2.0)
   org.apache.tomcat.embed:tomcat-embed-core:9.0.60 - tomcat-embed-core 
(https://tomcat.apache.org/), (Apache License, Version 2.0)
+  com.tencentcloudapi:tencentcloud-sdk-java-cls:3.1.830 - 
tencentcloud-sdk-java-cls 
(https://github.com/TencentCloud/tencentcloud-sdk-java-cls/tree/v3.1.830), 
(Apache License, Version 2.0)
 
 ========================================================================
 BSD licenses
diff --git a/pom.xml b/pom.xml
index ca8a551c21..576f0c73e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -203,6 +203,7 @@
         <jedis.version>2.9.0</jedis.version>
         <poi.version>5.2.3</poi.version>
         <otel.version>1.28.0</otel.version>
+        <tencentcloud-api.version>3.1.830</tencentcloud-api.version>
         <woodstox-core.version>5.4.0</woodstox-core.version>
     </properties>
 
@@ -1261,6 +1262,11 @@
                 <artifactId>poi-ooxml</artifactId>
                 <version>${poi.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.tencentcloudapi</groupId>
+                <artifactId>tencentcloud-sdk-java-cls</artifactId>
+                <version>${tencentcloud-api.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

Reply via email to