This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d3cc0a9fb [INLONG-11618][Manager] Support COS stream source (#11619)
4d3cc0a9fb is described below
commit 4d3cc0a9fb27b67a1f91a1791a37229af1af2499
Author: fuweng11 <[email protected]>
AuthorDate: Mon Dec 23 18:47:02 2024 +0800
[INLONG-11618][Manager] Support COS stream source (#11619)
---
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SourceType.java | 2 +
.../manager/dao/mapper/DataNodeEntityMapper.java | 3 +
.../resources/mappers/DataNodeEntityMapper.xml | 10 ++
.../manager/pojo/node/cos/COSDataNodeDTO.java | 78 ++++++++++
.../manager/pojo/node/cos/COSDataNodeInfo.java | 65 +++++++++
.../manager/pojo/node/cos/COSDataNodeRequest.java | 52 +++++++
.../pojo/source/cos/COSDataAddTaskRequest.java | 42 ++++++
.../inlong/manager/pojo/source/cos/COSSource.java | 90 ++++++++++++
.../manager/pojo/source/cos/COSSourceDTO.java | 116 +++++++++++++++
.../manager/pojo/source/cos/COSSourceRequest.java | 71 +++++++++
.../manager/service/node/DataNodeService.java | 2 +
.../manager/service/node/DataNodeServiceImpl.java | 14 ++
.../service/node/cos/COSDataNodeOperator.java | 84 +++++++++++
.../service/source/AbstractSourceOperator.java | 11 ++
.../service/source/cos/COSSourceOperator.java | 160 +++++++++++++++++++++
16 files changed, 801 insertions(+)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 47b139f159..35164d31d6 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -29,6 +29,7 @@ public class DataNodeType {
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
+ public static final String COS = "COS";
public static final String STARROCKS = "STARROCKS";
public static final String REDIS = "REDIS";
public static final String KUDU = "KUDU";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index cb6f1a7f6b..59deca4749 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -31,6 +31,7 @@ public class SourceType extends StreamType {
public static final String TUBEMQ = "TUBEMQ";
public static final String FILE = "FILE";
+ public static final String COS = "COS";
public static final String MYSQL_SQL = "MYSQL_SQL";
public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
public static final String MONGODB = "MONGODB";
@@ -47,6 +48,7 @@ public class SourceType extends StreamType {
put(KAFKA, TaskTypeEnum.KAFKA);
put(FILE, TaskTypeEnum.FILE);
+ put(COS, TaskTypeEnum.COS);
put(MYSQL_SQL, TaskTypeEnum.SQL);
put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
put(POSTGRESQL, TaskTypeEnum.POSTGRES);
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index c469dccc13..a9c45e3f6f 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -58,4 +58,7 @@ public interface DataNodeEntityMapper {
@MultiTenantQuery(with = false)
List<DataNodeEntity> selectByIdSelective(DataNodeEntity record);
+ @MultiTenantQuery(with = false)
+ DataNodeEntity selectByUniqueKeyWithoutTenant(@Param("name") String name,
@Param("type") String type);
+
}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 7e62cb562a..7ec124aa74 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -263,4 +263,14 @@
and tenant = #{sourceTenant, jdbcType=VARCHAR}
</where>
</insert>
+ <select id="selectByUniqueKeyWithoutTenant"
resultType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from data_node
+ <where>
+ name = #{name, jdbcType=VARCHAR}
+ and type = #{type, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </where>
+ </select>
</mapper>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
new file mode 100644
index 0000000000..f3cb2973cc
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * COS data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("COS data node info")
+public class COSDataNodeDTO {
+
+ @ApiModelProperty(value = "COS bucket name")
+ private String bucketName;
+
+ @ApiModelProperty(value = "COS secret id")
+ private String credentialsId;
+
+ @ApiModelProperty(value = "COS secret key")
+ private String credentialsKey;
+
+ @ApiModelProperty(value = "COS region")
+ private String region;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static COSDataNodeDTO getFromRequest(COSDataNodeRequest request,
String extParams) {
+ COSDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+ ? COSDataNodeDTO.getFromJson(extParams)
+ : new COSDataNodeDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static COSDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, COSDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ String.format("Failed to parse extParams for COS node:
%s", e.getMessage()));
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
new file mode 100644
index 0000000000..06c89e40dc
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * COS data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.COS)
+@ApiModel("COS data node info")
+public class COSDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty(value = "COS bucket name")
+ private String bucketName;
+
+ @ApiModelProperty(value = "COS secret id")
+ private String credentialsId;
+
+ @ApiModelProperty(value = "COS secret key")
+ private String credentialsKey;
+
+ @ApiModelProperty(value = "COS region")
+ private String region;
+
+ public COSDataNodeInfo() {
+ this.setType(DataNodeType.COS);
+ }
+
+ @Override
+ public COSDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, COSDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
new file mode 100644
index 0000000000..2b3b212687
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * COS data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.COS)
+@ApiModel("COS data node request")
+public class COSDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty(value = "COS bucket name")
+ private String bucketName;
+
+ @ApiModelProperty(value = "COS secret id")
+ private String credentialsId;
+
+ @ApiModelProperty(value = "COS secret key")
+ private String credentialsKey;
+
+ @ApiModelProperty(value = "COS region")
+ private String region;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
new file mode 100644
index 0000000000..ca89b7ebab
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.List;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.COS)
+@ApiModel(value = "COS data add task request")
+public class COSDataAddTaskRequest extends DataAddTaskRequest {
+
+ @ApiModelProperty("filterStreams")
+ private List<String> filterStreams;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
new file mode 100644
index 0000000000..cb496689bf
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+import java.util.List;
+
+/**
+ * COS source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "COS source info")
+@JsonTypeDefine(value = SourceType.COS)
+public class COSSource extends StreamSource {
+
+ @ApiModelProperty(value = "Path regex pattern for file, such as
/a/b/*.txt", required = true)
+ private String pattern;
+
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute
before, "
+ + "'1h' means from one hour after, '-1h' means from one minute
before, "
+ + "'1d' means from one day after, '-1d' means from one minute
before, "
+ + "Null or blank means from current timestamp")
+ private String timeOffset;
+
+ @ApiModelProperty("Max file count")
+ private String maxFileCount;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator:
, | : "
+ + " Json format, set this parameter to json ")
+ private String contentStyle;
+
+ @ApiModelProperty("filterStreams")
+ private List<String> filterStreams;
+
+ public COSSource() {
+ this.setSourceType(SourceType.COS);
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, COSSourceRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
new file mode 100644
index 0000000000..1dd85385fc
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.List;
+
+/**
+ * COS source information data transfer object
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@Slf4j
+public class COSSourceDTO {
+
+ @ApiModelProperty(value = "Path regex pattern for file, such as
/a/b/*.txt", required = true)
+ private String pattern;
+
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit = "D";
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry = false;;
+
+ @ApiModelProperty("Column separator of data source ")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute
before, "
+ + "'1h' means from one hour after, '-1h' means from one minute
before, "
+ + "'1d' means from one day after, '-1d' means from one minute
before, "
+ + "Null or blank means from current timestamp")
+ private String timeOffset;
+
+ @ApiModelProperty("Max file count")
+ private String maxFileCount;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator:
, | : "
+ + " Json format, set this parameter to json ")
+ private String contentStyle;
+
+ @ApiModelProperty(value = "Audit version")
+ private String auditVersion;
+
+ @ApiModelProperty("filterStreams")
+ private List<String> filterStreams;
+
+ @ApiModelProperty(value = "COS bucket name")
+ private String bucketName;
+
+ @ApiModelProperty(value = "COS secret id")
+ private String credentialsId;
+
+ @ApiModelProperty(value = "COS secret key")
+ private String credentialsKey;
+
+ @ApiModelProperty(value = "COS region")
+ private String region;
+
+ public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest
cosSourceRequest, String extParams) {
+ COSSourceDTO dto = StringUtils.isNotBlank(extParams)
+ ? COSSourceDTO.getFromJson(extParams)
+ : new COSSourceDTO();
+ return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true);
+ }
+
+ public static COSSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ log.info("teste extparmas={}", extParams);
+ return JsonUtils.parseObject(extParams, COSSourceDTO.class);
+ } catch (Exception e) {
+ log.info("teste extparmas=eoor:", e);
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("parse extParams of COSSource failure: %s",
e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
new file mode 100644
index 0000000000..18266d619a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.List;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.COS)
+@ApiModel(value = "COS source request")
+public class COSSourceRequest extends SourceRequest {
+
+ @ApiModelProperty(value = "Path regex pattern for file, such as
/a/b/*.txt", required = true)
+ private String pattern;
+
+ @ApiModelProperty("Cycle unit")
+ private String cycleUnit;
+
+ @ApiModelProperty("Whether retry")
+ private Boolean retry;
+
+ @ApiModelProperty(value = "Data start time")
+ private String dataTimeFrom;
+
+ @ApiModelProperty(value = "Data end time")
+ private String dataTimeTo;
+
+ @ApiModelProperty("TimeOffset for collection, "
+ + "'1m' means from one minute after, '-1m' means from one minute
before, "
+ + "'1h' means from one hour after, '-1h' means from one minute
before, "
+ + "'1d' means from one day after, '-1d' means from one minute
before, "
+ + "Null or blank means from current timestamp")
+ private String timeOffset;
+
+ @ApiModelProperty("Max file count")
+ private String maxFileCount;
+
+ @ApiModelProperty(" Type of data result for column separator"
+ + " CSV format, set this parameter to a custom separator:
, | : "
+ + " Json format, set this parameter to json ")
+ private String contentStyle;
+
+ @ApiModelProperty("filterStreams")
+ private List<String> filterStreams;
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index 73210536dd..48b900d56b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -156,4 +156,6 @@ public interface DataNodeService {
*/
Boolean testConnection(DataNodeRequest request);
+ DataNodeInfo getByKeyWithoutTenant(String name, String type);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 9022049acf..f7db3ec87e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -296,4 +296,18 @@ public class DataNodeServiceImpl implements
DataNodeService {
return result;
}
+ @Override
+ public DataNodeInfo getByKeyWithoutTenant(String name, String type) {
+ DataNodeEntity entity =
dataNodeMapper.selectByUniqueKeyWithoutTenant(name, type);
+ if (entity == null) {
+ LOGGER.error("data node not found by name={}, type={}", name,
type);
+ throw new BusinessException("data node not found");
+ }
+ String dataNodeType = entity.getType();
+ DataNodeOperator dataNodeOperator =
operatorFactory.getInstance(dataNodeType);
+ DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
+ LOGGER.debug("success to get data node info by name={}, type={}",
name, type);
+ return dataNodeInfo;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
new file mode 100644
index 0000000000..2e4cc594ba
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * COS data node operator
+ */
+@Service
+public class COSDataNodeOperator extends AbstractDataNodeOperator {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.COS;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ COSDataNodeInfo dataNodeInfo = new COSDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, dataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ COSDataNodeDTO dto =
COSDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, dataNodeInfo);
+ }
+ return dataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ COSDataNodeRequest dataNodeRequest = (COSDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
+ try {
+ COSDataNodeDTO dto =
COSDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("Failed to build extParams for COS node:
%s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index cdf6290281..83382dbcc1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -59,6 +59,7 @@ import
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -554,6 +555,16 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
extParams = JsonUtils.toJsonString(fileSourceDTO);
}
}
+ if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) {
+ String dataSeparator = String.valueOf((char)
Integer.parseInt(streamEntity.getDataSeparator()));
+ COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams,
COSSourceDTO.class);
+ if (Objects.nonNull(cosSourceDTO)) {
+ cosSourceDTO.setDataSeparator(dataSeparator);
+ dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
+ cosSourceDTO.setContentStyle(streamEntity.getDataType());
+ extParams = JsonUtils.toJsonString(cosSourceDTO);
+ }
+ }
InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
// Processing extParams
unpackExtParams(streamEntity.getExtParams(), streamInfo);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
new file mode 100644
index 0000000000..f99a6321bd
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.cos.COSSource;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * COS source operator, such as get or set COS source info.
+ */
+@Service
+public class COSSourceOperator extends AbstractSourceOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(COSSourceOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Autowired
+ private StreamSourceEntityMapper sourceMapper;
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.COS.equals(sourceType);
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.COS;
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
+ COSSourceRequest sourceRequest = (COSSourceRequest) request;
+ try {
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest,
targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of COS SourceDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public StreamSource getFromEntity(StreamSourceEntity entity) {
+ COSSource source = new COSSource();
+ if (entity == null) {
+ return source;
+ }
+
+ COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, source, true);
+ CommonBeanUtils.copyProperties(dto, source, true);
+
+ List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+ source.setFieldList(sourceFields);
+
+ List<StreamSourceEntity> dataAddTaskList =
sourceMapper.selectByTaskMapId(entity.getId());
+ source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity ->
DataAddTaskDTO.builder()
+ .id(subEntity.getId())
+ .taskMapId(entity.getId())
+ .agentIp(subEntity.getAgentIp())
+ .status(subEntity.getStatus()).build())
+ .collect(Collectors.toList()));
+ return source;
+ }
+
+ @Override
+ public String getExtParams(StreamSourceEntity sourceEntity) {
+ COSSourceDTO cosSourceDTO =
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+ if (Objects.nonNull(cosSourceDTO) &&
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
+ COSDataNodeInfo dataNodeInfo =
+ (COSDataNodeInfo)
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
+ DataNodeType.COS);
+ CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true);
+ return JsonUtils.toJsonString(cosSourceDTO);
+ }
+ return sourceEntity.getExtParams();
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
+ public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
+ try {
+ COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest)
request;
+ StreamSourceEntity sourceEntity =
sourceMapper.selectById(request.getSourceId());
+ COSSourceDTO dto =
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+ dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
+ dto.setDataTimeTo(sourceRequest.getDataTimeTo());
+ dto.setRetry(true);
+ if (request.getIncreaseAuditVersion()) {
+ dto.setAuditVersion(request.getAuditVersion());
+ }
+ dto.setFilterStreams(sourceRequest.getFilterStreams());
+ StreamSourceEntity dataAddTaskEntity =
+ CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
+ dataAddTaskEntity.setId(null);
+ dataAddTaskEntity.setSourceName(
+ sourceEntity.getSourceName() + "-" +
request.getAuditVersion() + "-" + sourceEntity.getId());
+
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
+ Integer id = sourceMapper.insert(dataAddTaskEntity);
+ SourceRequest dataAddTaskRequest =
+ CommonBeanUtils.copyProperties(dataAddTaskEntity,
SourceRequest::new, true);
+ updateAgentTaskConfig(dataAddTaskRequest, operator);
+ return id;
+ } catch (Exception e) {
+ LOGGER.error("serialize extParams of COS SourceDTO failure: ", e);
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of COS SourceDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+}