This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d3cc0a9fb [INLONG-11618][Manager] Support COS stream source (#11619)
4d3cc0a9fb is described below

commit 4d3cc0a9fb27b67a1f91a1791a37229af1af2499
Author: fuweng11 <[email protected]>
AuthorDate: Mon Dec 23 18:47:02 2024 +0800

    [INLONG-11618][Manager] Support COS stream source (#11619)
---
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SourceType.java   |   2 +
 .../manager/dao/mapper/DataNodeEntityMapper.java   |   3 +
 .../resources/mappers/DataNodeEntityMapper.xml     |  10 ++
 .../manager/pojo/node/cos/COSDataNodeDTO.java      |  78 ++++++++++
 .../manager/pojo/node/cos/COSDataNodeInfo.java     |  65 +++++++++
 .../manager/pojo/node/cos/COSDataNodeRequest.java  |  52 +++++++
 .../pojo/source/cos/COSDataAddTaskRequest.java     |  42 ++++++
 .../inlong/manager/pojo/source/cos/COSSource.java  |  90 ++++++++++++
 .../manager/pojo/source/cos/COSSourceDTO.java      | 116 +++++++++++++++
 .../manager/pojo/source/cos/COSSourceRequest.java  |  71 +++++++++
 .../manager/service/node/DataNodeService.java      |   2 +
 .../manager/service/node/DataNodeServiceImpl.java  |  14 ++
 .../service/node/cos/COSDataNodeOperator.java      |  84 +++++++++++
 .../service/source/AbstractSourceOperator.java     |  11 ++
 .../service/source/cos/COSSourceOperator.java      | 160 +++++++++++++++++++++
 16 files changed, 801 insertions(+)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 47b139f159..35164d31d6 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -29,6 +29,7 @@ public class DataNodeType {
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String MYSQL = "MYSQL";
+    public static final String COS = "COS";
     public static final String STARROCKS = "STARROCKS";
     public static final String REDIS = "REDIS";
     public static final String KUDU = "KUDU";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index cb6f1a7f6b..59deca4749 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -31,6 +31,7 @@ public class SourceType extends StreamType {
     public static final String TUBEMQ = "TUBEMQ";
 
     public static final String FILE = "FILE";
+    public static final String COS = "COS";
     public static final String MYSQL_SQL = "MYSQL_SQL";
     public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
     public static final String MONGODB = "MONGODB";
@@ -47,6 +48,7 @@ public class SourceType extends StreamType {
             put(KAFKA, TaskTypeEnum.KAFKA);
 
             put(FILE, TaskTypeEnum.FILE);
+            put(COS, TaskTypeEnum.COS);
             put(MYSQL_SQL, TaskTypeEnum.SQL);
             put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
             put(POSTGRESQL, TaskTypeEnum.POSTGRES);
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index c469dccc13..a9c45e3f6f 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -58,4 +58,7 @@ public interface DataNodeEntityMapper {
     @MultiTenantQuery(with = false)
     List<DataNodeEntity> selectByIdSelective(DataNodeEntity record);
 
+    @MultiTenantQuery(with = false)
+    DataNodeEntity selectByUniqueKeyWithoutTenant(@Param("name") String name, 
@Param("type") String type);
+
 }
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 7e62cb562a..7ec124aa74 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -263,4 +263,14 @@
             and tenant = #{sourceTenant, jdbcType=VARCHAR}
         </where>
     </insert>
+    <select id="selectByUniqueKeyWithoutTenant" 
resultType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from data_node
+        <where>
+            name = #{name, jdbcType=VARCHAR}
+            and type = #{type, jdbcType=VARCHAR}
+            and is_deleted = 0
+        </where>
+    </select>
 </mapper>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
new file mode 100644
index 0000000000..f3cb2973cc
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * COS data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("COS data node info")
+public class COSDataNodeDTO {
+
+    @ApiModelProperty(value = "COS bucket name")
+    private String bucketName;
+
+    @ApiModelProperty(value = "COS secret id")
+    private String credentialsId;
+
+    @ApiModelProperty(value = "COS secret key")
+    private String credentialsKey;
+
+    @ApiModelProperty(value = "COS region")
+    private String region;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static COSDataNodeDTO getFromRequest(COSDataNodeRequest request, 
String extParams) {
+        COSDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? COSDataNodeDTO.getFromJson(extParams)
+                : new COSDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static COSDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, COSDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for COS node: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
new file mode 100644
index 0000000000..06c89e40dc
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * COS data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.COS)
+@ApiModel("COS data node info")
+public class COSDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty(value = "COS bucket name")
+    private String bucketName;
+
+    @ApiModelProperty(value = "COS secret id")
+    private String credentialsId;
+
+    @ApiModelProperty(value = "COS secret key")
+    private String credentialsKey;
+
+    @ApiModelProperty(value = "COS region")
+    private String region;
+
+    public COSDataNodeInfo() {
+        this.setType(DataNodeType.COS);
+    }
+
+    @Override
+    public COSDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, COSDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
new file mode 100644
index 0000000000..2b3b212687
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/cos/COSDataNodeRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * COS data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.COS)
+@ApiModel("COS data node request")
+public class COSDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty(value = "COS bucket name")
+    private String bucketName;
+
+    @ApiModelProperty(value = "COS secret id")
+    private String credentialsId;
+
+    @ApiModelProperty(value = "COS secret key")
+    private String credentialsKey;
+
+    @ApiModelProperty(value = "COS region")
+    private String region;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
new file mode 100644
index 0000000000..ca89b7ebab
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSDataAddTaskRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.List;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.COS)
+@ApiModel(value = "COS data add task request")
+public class COSDataAddTaskRequest extends DataAddTaskRequest {
+
+    @ApiModelProperty("filterStreams")
+    private List<String> filterStreams;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
new file mode 100644
index 0000000000..cb496689bf
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+import java.util.List;
+
+/**
+ * COS source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "COS source info")
+@JsonTypeDefine(value = SourceType.COS)
+public class COSSource extends StreamSource {
+
+    @ApiModelProperty(value = "Path regex pattern for file, such as 
/a/b/*.txt", required = true)
+    private String pattern;
+
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute 
before, "
+            + "'1h' means from one hour after, '-1h' means from one minute 
before, "
+            + "'1d' means from one day after, '-1d' means from one minute 
before, "
+            + "Null or blank means from current timestamp")
+    private String timeOffset;
+
+    @ApiModelProperty("Max file count")
+    private String maxFileCount;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: 
, | : "
+            + "         Json format, set this parameter to json ")
+    private String contentStyle;
+
+    @ApiModelProperty("filterStreams")
+    private List<String> filterStreams;
+
+    public COSSource() {
+        this.setSourceType(SourceType.COS);
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, COSSourceRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
new file mode 100644
index 0000000000..1dd85385fc
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.List;
+
+/**
+ * COS source information data transfer object
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@Slf4j
+public class COSSourceDTO {
+
+    @ApiModelProperty(value = "Path regex pattern for file, such as 
/a/b/*.txt", required = true)
+    private String pattern;
+
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit = "D";
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry = false;;
+
+    @ApiModelProperty("Column separator of data source ")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute 
before, "
+            + "'1h' means from one hour after, '-1h' means from one minute 
before, "
+            + "'1d' means from one day after, '-1d' means from one minute 
before, "
+            + "Null or blank means from current timestamp")
+    private String timeOffset;
+
+    @ApiModelProperty("Max file count")
+    private String maxFileCount;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: 
, | : "
+            + "         Json format, set this parameter to json ")
+    private String contentStyle;
+
+    @ApiModelProperty(value = "Audit version")
+    private String auditVersion;
+
+    @ApiModelProperty("filterStreams")
+    private List<String> filterStreams;
+
+    @ApiModelProperty(value = "COS bucket name")
+    private String bucketName;
+
+    @ApiModelProperty(value = "COS secret id")
+    private String credentialsId;
+
+    @ApiModelProperty(value = "COS secret key")
+    private String credentialsKey;
+
+    @ApiModelProperty(value = "COS region")
+    private String region;
+
+    public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest 
cosSourceRequest, String extParams) {
+        COSSourceDTO dto = StringUtils.isNotBlank(extParams)
+                ? COSSourceDTO.getFromJson(extParams)
+                : new COSSourceDTO();
+        return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true);
+    }
+
+    public static COSSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            log.info("teste extparmas={}", extParams);
+            return JsonUtils.parseObject(extParams, COSSourceDTO.class);
+        } catch (Exception e) {
+            log.info("teste extparmas=eoor:", e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("parse extParams of COSSource failure: %s", 
e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
new file mode 100644
index 0000000000..18266d619a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.cos;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.util.List;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.COS)
+@ApiModel(value = "COS source request")
+public class COSSourceRequest extends SourceRequest {
+
+    @ApiModelProperty(value = "Path regex pattern for file, such as 
/a/b/*.txt", required = true)
+    private String pattern;
+
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute 
before, "
+            + "'1h' means from one hour after, '-1h' means from one minute 
before, "
+            + "'1d' means from one day after, '-1d' means from one minute 
before, "
+            + "Null or blank means from current timestamp")
+    private String timeOffset;
+
+    @ApiModelProperty("Max file count")
+    private String maxFileCount;
+
+    @ApiModelProperty(" Type of data result for column separator"
+            + "         CSV format, set this parameter to a custom separator: 
, | : "
+            + "         Json format, set this parameter to json ")
+    private String contentStyle;
+
+    @ApiModelProperty("filterStreams")
+    private List<String> filterStreams;
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index 73210536dd..48b900d56b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -156,4 +156,6 @@ public interface DataNodeService {
      */
     Boolean testConnection(DataNodeRequest request);
 
+    DataNodeInfo getByKeyWithoutTenant(String name, String type);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 9022049acf..f7db3ec87e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -296,4 +296,18 @@ public class DataNodeServiceImpl implements 
DataNodeService {
         return result;
     }
 
+    @Override
+    public DataNodeInfo getByKeyWithoutTenant(String name, String type) {
+        DataNodeEntity entity = 
dataNodeMapper.selectByUniqueKeyWithoutTenant(name, type);
+        if (entity == null) {
+            LOGGER.error("data node not found by name={}, type={}", name, 
type);
+            throw new BusinessException("data node not found");
+        }
+        String dataNodeType = entity.getType();
+        DataNodeOperator dataNodeOperator = 
operatorFactory.getInstance(dataNodeType);
+        DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
+        LOGGER.debug("success to get data node info by name={}, type={}", 
name, type);
+        return dataNodeInfo;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
new file mode 100644
index 0000000000..2e4cc594ba
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/cos/COSDataNodeOperator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * COS data node operator
+ */
+@Service
+public class COSDataNodeOperator extends AbstractDataNodeOperator {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.COS;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        COSDataNodeInfo dataNodeInfo = new COSDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, dataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            COSDataNodeDTO dto = 
COSDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, dataNodeInfo);
+        }
+        return dataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        COSDataNodeRequest dataNodeRequest = (COSDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
+        try {
+            COSDataNodeDTO dto = 
COSDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for COS node: 
%s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index cdf6290281..83382dbcc1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -59,6 +59,7 @@ import 
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -554,6 +555,16 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
                     extParams = JsonUtils.toJsonString(fileSourceDTO);
                 }
             }
+            if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) {
+                String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
+                COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, 
COSSourceDTO.class);
+                if (Objects.nonNull(cosSourceDTO)) {
+                    cosSourceDTO.setDataSeparator(dataSeparator);
+                    dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
+                    cosSourceDTO.setContentStyle(streamEntity.getDataType());
+                    extParams = JsonUtils.toJsonString(cosSourceDTO);
+                }
+            }
             InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
             // Processing extParams
             unpackExtParams(streamEntity.getExtParams(), streamInfo);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
new file mode 100644
index 0000000000..f99a6321bd
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.cos;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.cos.COSSource;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
+import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * COS source operator, such as get or set COS source info.
+ */
+@Service
+public class COSSourceOperator extends AbstractSourceOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(COSSourceOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Autowired
+    private StreamSourceEntityMapper sourceMapper;
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.COS.equals(sourceType);
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.COS;
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
+        COSSourceRequest sourceRequest = (COSSourceRequest) request;
+        try {
+            CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+            COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of COS SourceDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public StreamSource getFromEntity(StreamSourceEntity entity) {
+        COSSource source = new COSSource();
+        if (entity == null) {
+            return source;
+        }
+
+        COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, source, true);
+        CommonBeanUtils.copyProperties(dto, source, true);
+
+        List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+        source.setFieldList(sourceFields);
+
+        List<StreamSourceEntity> dataAddTaskList = 
sourceMapper.selectByTaskMapId(entity.getId());
+        source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity -> 
DataAddTaskDTO.builder()
+                .id(subEntity.getId())
+                .taskMapId(entity.getId())
+                .agentIp(subEntity.getAgentIp())
+                .status(subEntity.getStatus()).build())
+                .collect(Collectors.toList()));
+        return source;
+    }
+
+    @Override
+    public String getExtParams(StreamSourceEntity sourceEntity) {
+        COSSourceDTO cosSourceDTO = 
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+        if (Objects.nonNull(cosSourceDTO) && 
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
+            COSDataNodeInfo dataNodeInfo =
+                    (COSDataNodeInfo) 
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
+                            DataNodeType.COS);
+            CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true);
+            return JsonUtils.toJsonString(cosSourceDTO);
+        }
+        return sourceEntity.getExtParams();
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
+    public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
+        try {
+            COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest) 
request;
+            StreamSourceEntity sourceEntity = 
sourceMapper.selectById(request.getSourceId());
+            COSSourceDTO dto = 
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+            dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
+            dto.setDataTimeTo(sourceRequest.getDataTimeTo());
+            dto.setRetry(true);
+            if (request.getIncreaseAuditVersion()) {
+                dto.setAuditVersion(request.getAuditVersion());
+            }
+            dto.setFilterStreams(sourceRequest.getFilterStreams());
+            StreamSourceEntity dataAddTaskEntity =
+                    CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
+            dataAddTaskEntity.setId(null);
+            dataAddTaskEntity.setSourceName(
+                    sourceEntity.getSourceName() + "-" + 
request.getAuditVersion() + "-" + sourceEntity.getId());
+            
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
+            Integer id = sourceMapper.insert(dataAddTaskEntity);
+            SourceRequest dataAddTaskRequest =
+                    CommonBeanUtils.copyProperties(dataAddTaskEntity, 
SourceRequest::new, true);
+            updateAgentTaskConfig(dataAddTaskRequest, operator);
+            return id;
+        } catch (Exception e) {
+            LOGGER.error("serialize extParams of COS SourceDTO failure: ", e);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of COS SourceDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+}


Reply via email to