This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 25e03cedea [INLONG-8845][Manager] Support Tencent Cloud Log Service
data flow (#8892)
25e03cedea is described below
commit 25e03cedea49939072fde3bfb3703ca4ab085cfc
Author: castor <[email protected]>
AuthorDate: Sun Sep 17 22:01:01 2023 +0800
[INLONG-8845][Manager] Support Tencent Cloud Log Service data flow (#8892)
---
.../inlong/manager/common/consts/DataNodeType.java | 5 +
.../manager/common/consts/InlongConstants.java | 2 +
.../inlong/manager/common/consts/SinkType.java | 5 +
.../pojo/group/pulsar/InlongPulsarRequest.java | 2 +-
.../pojo/group/pulsar/InlongPulsarTopicInfo.java | 2 +-
.../manager/pojo/node/cls/ClsDataNodeDTO.java | 94 +++++++++++
.../manager/pojo/node/cls/ClsDataNodeInfo.java | 81 +++++++++
.../manager/pojo/node/cls/ClsDataNodeRequest.java | 71 ++++++++
.../inlong/manager/pojo/sink/cls/ClsSink.java | 90 ++++++++++
.../inlong/manager/pojo/sink/cls/ClsSinkDTO.java | 76 +++++++++
.../cls/ClsSinkRequest.java} | 41 +++--
inlong-manager/manager-service/pom.xml | 5 +-
.../service/group/InlongGroupOperator4Pulsar.java | 2 +-
.../service/node/cls/ClsDataNodeOperator.java | 111 +++++++++++++
.../resource/sink/cls/ClsResourceOperator.java | 183 +++++++++++++++++++++
.../sink/pulsar/PulsarResourceOperator.java | 6 +
.../manager/service/sink/cls/ClsSinkOperator.java | 131 +++++++++++++++
licenses/inlong-manager/LICENSE | 1 +
pom.xml | 6 +
19 files changed, 888 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 6e1002612c..e14a98195f 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -36,5 +36,10 @@ public class DataNodeType {
public static final String ORACLE = "ORACLE";
public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
+ /**
+ * Tencent cloud log service
+ * Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
+ */
+ public static final String CLS = "CLS";
public static final String PULSAR = "PULSAR";
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 3683280c6d..7772ef4ba0 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -60,6 +60,8 @@ public class InlongConstants {
public static final String UNDERSCORE = "_";
+ public static final String CENTER_LINE = "\\|";
+
public static final String LEFT_BRACKET = "(";
public static final String PERCENT = "%";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index f11e9daf6d..965e809f51 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -34,4 +34,9 @@ public class SinkType extends StreamType {
public static final String STARROCKS = "STARROCKS";
public static final String KUDU = "KUDU";
public static final String REDIS = "REDIS";
+ /**
+ * Tencent cloud log service
+ * Details: <a href="https://www.tencentcloud.com/products/cls">CLS</a>
+ */
+ public static final String CLS = "CLS";
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
index 443a5af472..9d93e39ee5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
@@ -42,7 +42,7 @@ public class InlongPulsarRequest extends InlongGroupRequest {
* from {@link
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest}
*/
@ApiModelProperty(value = "Pulsar tenant")
- private String tenant;
+ private String pulsarTenant;
@ApiModelProperty(value = "Queue model, parallel: multiple partitions,
high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
index c1a955b339..835557fcce 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
@@ -39,7 +39,7 @@ import java.util.List;
public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
@ApiModelProperty(value = "Pulsar tenant")
- private String tenant;
+ private String pulsarTenant;
@ApiModelProperty(value = "Pulsar namespace")
private String namespace;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
new file mode 100644
index 0000000000..dc0d29926b
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeDTO.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Cloud log service data node info")
+public class ClsDataNodeDTO {
+
+ @ApiModelProperty("Cloud log service master account")
+ private String mainAccountId;
+
+ @ApiModelProperty("Cloud log service sub account")
+ private String subAccountId;
+
+ @ApiModelProperty("Cloud log service send api secretKey")
+ private String sendSecretKey;
+
+ @ApiModelProperty("Cloud log service send api secretId")
+ private String sendSecretId;
+
+ @ApiModelProperty("Cloud log service manage api secretKey")
+ private String manageSecretKey;
+
+ @ApiModelProperty("Cloud log service manage api secretId")
+ private String manageSecretId;
+
+ @ApiModelProperty("Cloud log service endpoint")
+ private String endpoint;
+
+ @ApiModelProperty("Cloud log service region")
+ private String region;
+
+ @ApiModelProperty("Cloud log service log set id")
+ private String logSetId;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ClsDataNodeDTO getFromRequest(ClsDataNodeRequest request,
String extParams) {
+ ClsDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? ClsDataNodeDTO.getFromJson(extParams)
+ : new ClsDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static ClsDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, ClsDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
+ String.format("Failed to parse extParams for Cloud log
service node: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
new file mode 100644
index 0000000000..083dc43558
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.ELASTICSEARCH)
+@ApiModel("Cloud log service data node info")
+public class ClsDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("Cloud log service master account")
+ private String mainAccountId;
+
+ @ApiModelProperty("Cloud log service sub account")
+ private String subAccountId;
+
+ @ApiModelProperty("Cloud log service send api secretKey")
+ private String sendSecretKey;
+
+ @ApiModelProperty("Cloud log service send api secretId")
+ private String sendSecretId;
+
+ @ApiModelProperty("Cloud log service manage api secretKey")
+ private String manageSecretKey;
+
+ @ApiModelProperty("Cloud log service manage api secretId")
+ private String manageSecretId;
+
+ @ApiModelProperty("Cloud log service endpoint")
+ private String endpoint;
+
+ @ApiModelProperty("Cloud log service region")
+ private String region;
+
+ @ApiModelProperty("Cloud log service log set id")
+ private String logSetId;
+
+ public ClsDataNodeInfo() {
+ setType(DataNodeType.CLS);
+ }
+
+ @Override
+ public DataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, ClsDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
new file mode 100644
index 0000000000..7db2392cc9
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cls/ClsDataNodeRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Cloud log service data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.CLS)
+@ApiModel("Cloud log service data node request")
+public class ClsDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("Cloud log service master account")
+ private String mainAccountId;
+
+ @ApiModelProperty("Cloud log service sub account")
+ private String subAccountId;
+
+ @ApiModelProperty("Cloud log service send api secretKey")
+ private String sendSecretKey;
+
+ @ApiModelProperty("Cloud log service send api secretId")
+ private String sendSecretId;
+
+ @ApiModelProperty("Cloud log service manage api secretKey")
+ private String manageSecretKey;
+
+ @ApiModelProperty("Cloud log service manage api secretId")
+ private String manageSecretId;
+
+ @ApiModelProperty("Cloud log service endpoint")
+ private String endpoint;
+
+ @ApiModelProperty("Cloud log service region")
+ private String region;
+
+ @ApiModelProperty("Cloud log service log set id")
+ private String logSetId;
+
+ public ClsDataNodeRequest() {
+ this.setType(DataNodeType.CLS);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
new file mode 100644
index 0000000000..5ab92728fd
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.cls;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Cloud log service sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Cloud log service sink info")
+@JsonTypeDefine(value = SinkType.CLS)
+public class ClsSink extends StreamSink {
+
+ @ApiModelProperty("Cloud log service topic id")
+ private String topicId;
+
+ @ApiModelProperty("Cloud log service topic name")
+ private String topicName;
+
+ @ApiModelProperty("Cloud log service api send secretKey")
+ private String sendSecretKey;
+
+ @ApiModelProperty("Cloud log service api send secretId")
+ private String sendSecretId;
+
+ @ApiModelProperty("Cloud log service api manage secretKey")
+ private String manageSecretKey;
+
+ @ApiModelProperty("Cloud log service api manage secretId")
+ private String manageSecretId;
+
+ @ApiModelProperty("Cloud log service endpoint")
+ private String endpoint;
+
+ @ApiModelProperty("Cloud log service log set id")
+ private String logSetId;
+
+ @ApiModelProperty("Cloud log service tag name")
+ private String tag;
+
+ @ApiModelProperty("Cloud log service master account")
+ private String mainAccountId;
+
+ @ApiModelProperty("Cloud log service subAccount")
+ private String subAccountId;
+
+ @ApiModelProperty("Cloud log service index tokenizer")
+ private String tokenizer;
+
+ public ClsSink() {
+ this.setSinkType(SinkType.CLS);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, ClsSinkRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
new file mode 100644
index 0000000000..3acd1e0372
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.cls;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sink info of Cloud log service
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClsSinkDTO {
+
+ @ApiModelProperty("Cloud log service topic id")
+ private String topicId;
+
+ @ApiModelProperty("Cloud log service topic name")
+ private String topicName;
+
+ @ApiModelProperty("Cloud log service topic save time")
+ private Integer saveTime;
+
+ @ApiModelProperty("Cloud log service tag name")
+ private String tag;
+
+ @ApiModelProperty("Cloud log service index tokenizer")
+ private String tokenizer;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ClsSinkDTO getFromRequest(ClsSinkRequest request, String
extParams) {
+ ClsSinkDTO dto = StringUtils.isNotBlank(extParams)
+ ? ClsSinkDTO.getFromJson(extParams)
+ : new ClsSinkDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ public static ClsSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, ClsSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of CLS SinkDTO failure:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
similarity index 58%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
index c1a955b339..d962b3e9ac 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkRequest.java
@@ -15,40 +15,37 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.pulsar;
+package org.apache.inlong.manager.pojo.sink.cls;
-import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.ToString;
-import java.util.List;
-
+/**
+ * Cloud log service sink request.
+ */
@Data
-@Builder
-@AllArgsConstructor
+@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = MQType.PULSAR)
-@ApiModel("Inlong pulsar group topic info")
-public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
-
- @ApiModelProperty(value = "Pulsar tenant")
- private String tenant;
+@ApiModel(value = "Cloud log service sink request")
+@JsonTypeDefine(value = SinkType.CLS)
+public class ClsSinkRequest extends SinkRequest {
- @ApiModelProperty(value = "Pulsar namespace")
- private String namespace;
+ @ApiModelProperty("Cloud log service topic name")
+ private String topicName;
- @ApiModelProperty(value = "Pulsar topics")
- private List<String> topics;
+ @ApiModelProperty("Cloud log service topic save time")
+ private Integer saveTime;
- public InlongPulsarTopicInfo() {
- this.setMqType(MQType.PULSAR);
- }
+ @ApiModelProperty("Cloud log service tag name")
+ private String tag;
+ @ApiModelProperty("Cloud log service index tokenizer")
+ private String tokenizer;
}
diff --git a/inlong-manager/manager-service/pom.xml
b/inlong-manager/manager-service/pom.xml
index 0b1c499601..fe32b68874 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -152,7 +152,10 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>com.tencentcloudapi</groupId>
+ <artifactId>tencentcloud-sdk-java-cls</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index e39561a443..29eb364e9f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -129,7 +129,7 @@ public class InlongGroupOperator4Pulsar extends
AbstractGroupOperator {
tenant = pulsarCluster.getPulsarTenant();
}
InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
- topicInfo.setTenant(tenant);
+ topicInfo.setPulsarTenant(tenant);
topicInfo.setNamespace(groupInfo.getMqResource());
// each inlong stream is associated with a Pulsar topic
List<String> topics =
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
new file mode 100644
index 0000000000..40ec26f210
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cls/ClsDataNodeOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.tencentcloudapi.cls.v20201016.ClsClient;
+import com.tencentcloudapi.cls.v20201016.models.DescribeTopicsRequest;
+import com.tencentcloudapi.common.Credential;
+import com.tencentcloudapi.common.exception.TencentCloudSDKException;
+import com.tencentcloudapi.common.profile.ClientProfile;
+import com.tencentcloudapi.common.profile.HttpProfile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClsDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClsDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ ClsDataNodeRequest clsDataNodeRequest = (ClsDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(clsDataNodeRequest, targetEntity, true);
+ try {
+ ClsDataNodeDTO dto =
ClsDataNodeDTO.getFromRequest(clsDataNodeRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for Cloud log
service node: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return DataNodeType.CLS.equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.CLS;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+ ClsDataNodeInfo info = new ClsDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, info);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ ClsDataNodeDTO dto =
ClsDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, info);
+ }
+ return info;
+ }
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ ClsDataNodeRequest dataNodeRequest = (ClsDataNodeRequest) request;
+ Credential cred = new Credential(dataNodeRequest.getManageSecretId(),
dataNodeRequest.getManageSecretId());
+ HttpProfile httpProfile = new HttpProfile();
+ httpProfile.setEndpoint(dataNodeRequest.getEndpoint());
+ ClientProfile clientProfile = new ClientProfile();
+ clientProfile.setHttpProfile(httpProfile);
+ ClsClient client = new ClsClient(cred, dataNodeRequest.getRegion(),
clientProfile);
+ DescribeTopicsRequest req = new DescribeTopicsRequest();
+ try {
+ client.DescribeTopics(req);
+ } catch (TencentCloudSDKException e) {
+ String errMsg = String.format("connect tencent cloud error
endPoint = %s secretId = %s secretKey = %s",
+ dataNodeRequest.getEndpoint(),
dataNodeRequest.getManageSecretId(),
+ dataNodeRequest.getManageSecretKey());
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ return true;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
new file mode 100644
index 0000000000..e69f298ffe
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/cls/ClsResourceOperator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+
+import com.tencentcloudapi.cls.v20201016.ClsClient;
+import com.tencentcloudapi.cls.v20201016.models.CreateIndexRequest;
+import com.tencentcloudapi.cls.v20201016.models.CreateTopicRequest;
+import com.tencentcloudapi.cls.v20201016.models.CreateTopicResponse;
+import com.tencentcloudapi.cls.v20201016.models.FullTextInfo;
+import com.tencentcloudapi.cls.v20201016.models.RuleInfo;
+import com.tencentcloudapi.cls.v20201016.models.Tag;
+import com.tencentcloudapi.common.Credential;
+import com.tencentcloudapi.common.exception.TencentCloudSDKException;
+import com.tencentcloudapi.common.profile.ClientProfile;
+import com.tencentcloudapi.common.profile.HttpProfile;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClsResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClsResourceOperator.class);
+
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private StreamSinkEntityMapper streamSinkEntityMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.CLS.equals(sinkType);
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+ if
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("sink resource [" + sinkInfo.getId() + "] already
success, skip to create");
+ return;
+ } else if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
{
+ LOG.warn("create resource was disabled, skip to create for [" +
sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTopicID(sinkInfo);
+ }
+
+ /**
+ * Create cloud log service topic
+ */
+ private void createTopicID(SinkInfo sinkInfo) {
+ ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
+ ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(),
ClsSinkDTO.class);
+ try {
+ ClsClient client = getClsClient(clsDataNode);
+ CreateTopicRequest req = getCreateTopicRequest(clsDataNode,
clsSinkDTO);
+ CreateTopicResponse resp = client.CreateTopic(req);
+ LOG.info("create cls topic {} success ,topicId {}",
clsSinkDTO.getTopicName(), resp.getTopicId());
+ // update set topic id into sink info
+ clsSinkDTO.setTopicId(resp.getTopicId());
+ sinkInfo.setExtParams(JsonUtils.toJsonString(clsSinkDTO));
+ // create topic index by tokenizer
+ this.createTopicIndex(sinkInfo);
+ StreamSinkEntity streamSinkEntity = new StreamSinkEntity();
+ CommonBeanUtils.copyProperties(sinkInfo, streamSinkEntity, true);
+ streamSinkEntityMapper.updateByIdSelective(streamSinkEntity);
+ String info = "success to create cls resource";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info("update cls sink = {}info status success ,topicName {}",
streamSinkEntity.getSinkName(),
+ clsSinkDTO.getTopicName());
+ } catch (TencentCloudSDKException e) {
+ String errMsg = "Create cls topic failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new BusinessException(errMsg);
+ }
+ }
+
+ private CreateTopicRequest getCreateTopicRequest(ClsDataNodeDTO
clsDataNode, ClsSinkDTO clsSinkDTO) {
+ CreateTopicRequest req = new CreateTopicRequest();
+ String[] allTags =
clsSinkDTO.getTag().split(InlongConstants.CENTER_LINE);
+ req.setTags(convertTags(allTags));
+ req.setLogsetId(clsDataNode.getLogSetId());
+ req.setTopicName(clsSinkDTO.getTopicName());
+ return req;
+ }
+
+ private ClsClient getClsClient(ClsDataNodeDTO clsDataNode) {
+ Credential cred = new Credential(clsDataNode.getManageSecretId(),
+ clsDataNode.getManageSecretId());
+ HttpProfile httpProfile = new HttpProfile();
+ httpProfile.setEndpoint(clsDataNode.getEndpoint());
+ ClientProfile clientProfile = new ClientProfile();
+
+ clientProfile.setHttpProfile(httpProfile);
+ return new ClsClient(cred, clsDataNode.getRegion(), clientProfile);
+ }
+
+ /**
+ * Create topic index by tokenizer
+ */
+ private void createTopicIndex(SinkInfo sinkInfo) throws BusinessException {
+ ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(sinkInfo.getExtParams(),
ClsSinkDTO.class);
+ if (StringUtils.isNotBlank(clsSinkDTO.getTokenizer())) {
+ LOG.warn("topic {} tokenizer is empty", clsSinkDTO.getTopicName());
+ return;
+ }
+ ClsDataNodeDTO clsDataNode = getClsDataNode(sinkInfo);
+ ClsClient clsClient = getClsClient(clsDataNode);
+ RuleInfo ruleInfo = new RuleInfo();
+ FullTextInfo fullTextInfo = new FullTextInfo();
+ fullTextInfo.setTokenizer(clsSinkDTO.getTokenizer());
+ ruleInfo.setFullText(fullTextInfo);
+
+ CreateIndexRequest req = new CreateIndexRequest();
+ req.setTopicId(clsSinkDTO.getTopicId());
+ req.setRule(ruleInfo);
+ try {
+ clsClient.CreateIndex(req);
+ } catch (TencentCloudSDKException e) {
+ String errMsg = "Create cls topic index failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new BusinessException(errMsg);
+ }
+ LOG.info("topic {} create index success tokenizer is {}",
clsSinkDTO.getTopicName(), clsSinkDTO.getTokenizer());
+ }
+
+ private Tag[] convertTags(String[] allTags) {
+ Tag[] tags = new Tag[allTags.length];
+ for (int i = 0; i < allTags.length; i++) {
+ String tag = allTags[i];
+ String[] keyAndValueOfTag = tag.split(InlongConstants.COLON);
+ Tag tagInfo = new Tag();
+ tagInfo.set(keyAndValueOfTag[0], keyAndValueOfTag[1]);
+ tags[i] = tagInfo;
+ }
+ return tags;
+ }
+
+ private ClsDataNodeDTO getClsDataNode(SinkInfo sinkInfo) {
+ DataNodeEntity dataNodeEntity =
dataNodeEntityMapper.selectByUniqueKey(sinkInfo.getDataNodeName(),
+ DataNodeType.CLS);
+ return JsonUtils.parseObject(dataNodeEntity.getExtParams(),
ClsDataNodeDTO.class);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index 8f5935f8e4..0d6df611ea 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -35,6 +35,7 @@ import
org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -56,6 +57,8 @@ public class PulsarResourceOperator implements
SinkResourceOperator {
private DataNodeOperateHelper dataNodeHelper;
@Autowired
private PulsarOperator pulsarOperator;
+ @Autowired
+ private StreamSinkService sinkService;
@Override
public Boolean accept(String sinkType) {
@@ -99,6 +102,9 @@ public class PulsarResourceOperator implements
SinkResourceOperator {
.build();
// create topic
pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+ final String info = "success to create Pulsar resource";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info(info + " for sinkInfo={}", sinkInfo);
} catch (PulsarClientException | PulsarAdminException e) {
LOG.error("init pulsar admin error", e);
throw new BusinessException();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
new file mode 100644
index 0000000000..ad8f4c69c1
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.cls;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.cls.ClsDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSink;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
+import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Cloud log service sink operator
+ */
+@Service
+public class ClsSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClsSinkOperator.class);
+ private static final String KEY_FIELDS = "fieldNames";
+ private static final String SECRET_KEY = "secretKey";
+ private static final String SECRET_ID = "secretId";
+ private static final String END_POINT = "endpoint";
+ private static final String TOPIC_ID = "topicId";
+
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
+ ClsSinkRequest sinkRequest = (ClsSinkRequest) request;
+ try {
+ ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Doris SinkDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.CLS;
+ }
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.CLS.equals(sinkType);
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ ClsSink sink = new ClsSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ ClsSinkDTO dto = ClsSinkDTO.getFromJson(entity.getExtParams());
+ DataNodeEntity dataNodeEntity =
dataNodeEntityMapper.selectByUniqueKey(entity.getDataNodeName(),
+ DataNodeType.CLS);
+ ClsDataNodeDTO clsDataNodeDTO =
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+ ClsDataNodeDTO.class);
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ CommonBeanUtils.copyProperties(clsDataNodeDTO, sink, true);
+ List<SinkField> sinkFields = getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+ @Override
+ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink,
List<String> fields) {
+ Map<String, String> params = super.parse2IdParams(streamSink, fields);
+ ClsSinkDTO clsSinkDTO =
JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class);
+ params.put(TOPIC_ID, clsSinkDTO.getTopicId());
+ DataNodeEntity dataNodeEntity =
dataNodeEntityMapper.selectByUniqueKey(streamSink.getDataNodeName(),
+ DataNodeType.CLS);
+ ClsDataNodeDTO clsDataNodeDTO =
JsonUtils.parseObject(dataNodeEntity.getExtParams(),
+ ClsDataNodeDTO.class);
+ params.put(SECRET_ID, clsDataNodeDTO.getSendSecretId());
+ params.put(SECRET_KEY, clsDataNodeDTO.getSendSecretKey());
+ params.put(END_POINT, clsDataNodeDTO.getEndpoint());
+ StringBuilder fieldNames = new StringBuilder();
+ for (String field : fields) {
+ fieldNames.append(field).append(InlongConstants.BLANK);
+ }
+ params.put(KEY_FIELDS, fieldNames.toString());
+ return params;
+ }
+}
diff --git a/licenses/inlong-manager/LICENSE b/licenses/inlong-manager/LICENSE
index d14e3fd5b4..2643cf219a 100644
--- a/licenses/inlong-manager/LICENSE
+++ b/licenses/inlong-manager/LICENSE
@@ -681,6 +681,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
org.springframework:spring-webmvc:5.3.18 - Spring Web MVC
(https://github.com/spring-projects/spring-framework), (Apache License, Version
2.0)
com.typesafe:ssl-config-core_2.11:0.3.7 - ssl-config-core
(https://github.com/lightbend/ssl-config/tree/v0.3.7), (Apache-2.0)
org.apache.tomcat.embed:tomcat-embed-core:9.0.60 - tomcat-embed-core
(https://tomcat.apache.org/), (Apache License, Version 2.0)
+ com.tencentcloudapi:tencentcloud-sdk-java-cls:3.1.830 -
tencentcloud-sdk-java-cls
(https://github.com/TencentCloud/tencentcloud-sdk-java-cls/tree/v3.1.830),
(Apache License, Version 2.0)
========================================================================
BSD licenses
diff --git a/pom.xml b/pom.xml
index ca8a551c21..576f0c73e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -203,6 +203,7 @@
<jedis.version>2.9.0</jedis.version>
<poi.version>5.2.3</poi.version>
<otel.version>1.28.0</otel.version>
+ <tencentcloud-api.version>3.1.830</tencentcloud-api.version>
<woodstox-core.version>5.4.0</woodstox-core.version>
</properties>
@@ -1261,6 +1262,11 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.tencentcloudapi</groupId>
+ <artifactId>tencentcloud-sdk-java-cls</artifactId>
+ <version>${tencentcloud-api.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>