This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1f94676436 [Feature][Connector-V2][File] Add cos source&sink (#4979)
1f94676436 is described below
commit 1f94676436b730f6ef7b4d1e5d18fc2d689495e3
Author: dengdi <[email protected]>
AuthorDate: Wed Jul 26 23:43:09 2023 +0800
[Feature][Connector-V2][File] Add cos source&sink (#4979)
* [Feature][Connector-V2][File] Add cos sink
* update doc&e2e and add pom file header
* add e2e file header and config
* add file-cos module into dist pom.xml
* [Feature][Connector-V2][File] Add cos source
---------
Co-authored-by: dengd1937 <[email protected]>
---
docs/en/connector-v2/sink/CosFile.md | 259 ++++++++++++++++++
docs/en/connector-v2/source/CosFile.md | 289 +++++++++++++++++++++
plugin-mapping.properties | 2 +
.../seatunnel/file/config/FileSystemType.java | 1 +
.../connector-file/connector-file-cos/pom.xml | 64 +++++
.../seatunnel/file/cos/config/CosConf.java | 59 +++++
.../seatunnel/file/cos/config/CosConfig.java | 39 +++
.../seatunnel/file/cos/sink/CosFileSink.java | 63 +++++
.../file/cos/sink/CosFileSinkFactory.java | 88 +++++++
.../seatunnel/file/cos/source/CosFileSource.java | 119 +++++++++
.../file/cos/source/CosFileSourceFactory.java | 70 +++++
.../services/org.apache.hadoop.fs.FileSystem | 16 ++
.../seatunnel/file/cos/CosFileFactoryTest.java} | 27 +-
seatunnel-connectors-v2/connector-file/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-file-cos-e2e/pom.xml | 48 ++++
.../e2e/connector/file/cos/CosFileIT.java | 76 ++++++
.../test/resources/excel/cos_excel_to_assert.conf | 116 +++++++++
.../test/resources/excel/fake_to_cos_excel.conf | 82 ++++++
.../resources/json/cos_file_json_to_assert.conf | 114 ++++++++
.../test/resources/json/fake_to_cos_file_json.conf | 83 ++++++
.../test/resources/orc/cos_file_orc_to_assert.conf | 80 ++++++
.../test/resources/orc/fake_to_cos_file_orc.conf | 84 ++++++
.../parquet/cos_file_parquet_to_assert.conf | 80 ++++++
.../parquet/fake_to_cos_file_parquet.conf | 84 ++++++
.../resources/text/cos_file_text_to_assert.conf | 114 ++++++++
.../test/resources/text/fake_to_cos_file_text.conf | 84 ++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
28 files changed, 2132 insertions(+), 17 deletions(-)
diff --git a/docs/en/connector-v2/sink/CosFile.md
b/docs/en/connector-v2/sink/CosFile.md
new file mode 100644
index 0000000000..563b174c3c
--- /dev/null
+++ b/docs/en/connector-v2/sink/CosFile.md
@@ -0,0 +1,259 @@
+# CosFile
+
+> Cos file sink connector
+
+## Description
+
+Output data to cos file system.
+
+:::tip
+
+If you use spark/flink, In order to use this connector, You must ensure your
spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when
you download and install SeaTunnel Engine. You can check the jar package under
${SEATUNNEL_HOME}/lib to confirm this.
+
+To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar
and cos_api-bundle-{version}.jar in ${SEATUNNEL_HOME}/lib dir, download:
[Hadoop-Cos-release](https://github.com/tencentyun/hadoop-cos/releases). It
only supports hadoop version 2.6.5+ and version 8.0.2+.
+
+:::
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [x] file format type
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+ - [x] excel
+
+## Options
+
+| name | type | required |
default value | remarks
|
+|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
+| path | string | yes | -
|
|
+| bucket | string | yes | -
|
|
+| secret_id | string | yes | -
|
|
+| secret_key | string | yes | -
|
|
+| region | string | yes | -
|
|
+| custom_filename | boolean | no | false
| Whether you need custom the filename
|
+| file_name_expression | string | no | "${transactionId}"
| Only used when custom_filename is true
|
+| filename_time_format | string | no | "yyyy.MM.dd"
| Only used when custom_filename is true
|
+| file_format_type | string | no | "csv"
|
|
+| field_delimiter | string | no | '\001'
| Only used when file_format is text
|
+| row_delimiter | string | no | "\n"
| Only used when file_format is text
|
+| have_partition | boolean | no | false
| Whether you need processing partitions.
|
+| partition_by | array | no | -
| Only used then have_partition is true
|
+| partition_dir_expression | string | no |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is
true |
+| is_partition_field_write_in_file | boolean | no | false
| Only used then have_partition is true
|
+| sink_columns | array | no |
| When this parameter is empty, all fields are sink
columns |
+| is_enable_transaction | boolean | no | true
|
|
+| batch_size | int | no | 1000000
|
|
+| compress_codec | string | no | none
|
|
+| common-options | object | no | -
|
|
+| max_rows_in_memory | int | no | -
| Only used when file_format is excel.
|
+| sheet_name | string | no | Sheet${Random
number} | Only used when file_format is excel.
|
+
+### path [string]
+
+The target dir path is required.
+
+### bucket [string]
+
+The bucket address of cos file system, for example:
`cosn://seatunnel-test-1259587829`
+
+### secret_id [string]
+
+The secret id of cos file system.
+
+### secret_key [string]
+
+The secret key of cos file system.
+
+### region [string]
+
+The region of cos file system.
+
+### custom_filename [boolean]
+
+Whether custom the filename
+
+### file_name_expression [string]
+
+Only used when `custom_filename` is `true`
+
+`file_name_expression` describes the file expression which will be created
into the `path`. We can add the variable `${now}` or `${uuid}` in the
`file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by
specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add
`${transactionId}_` in the head of the file.
+
+### filename_time_format [string]
+
+Only used when `custom_filename` is `true`
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` ,
`filename_time_format` can specify the time format of the path, and the default
value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description |
+|--------|--------------------|
+| y | Year |
+| M | Month |
+| d | Day of month |
+| H | Hour in day (0-23) |
+| m | Minute in hour |
+| s | Second in minute |
+
+### file_format_type [string]
+
+We supported as the following file types:
+
+`text` `json` `csv` `orc` `parquet` `excel`
+
+Please note that, The final file name will end with the file_format's suffix,
the suffix of the text file is `txt`.
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` file
format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` file format.
+
+### have_partition [boolean]
+
+Whether you need processing partitions.
+
+### partition_by [array]
+
+Only used when `have_partition` is `true`.
+
+Partition data based on selected fields.
+
+### partition_dir_expression [string]
+
+Only used when `have_partition` is `true`.
+
+If the `partition_by` is specified, we will generate the corresponding
partition directory based on the partition information, and the final file will
be placed in the partition directory.
+
+Default `partition_dir_expression` is
`${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field
and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+Only used when `have_partition` is `true`.
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the
value of it will be write into data file.
+
+For example, if you want to write a Hive Data File, Its value should be
`false`.
+
+### sink_columns [array]
+
+Which columns need be written to file, default value is all the columns get
from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually
written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost
or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add
`${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following
shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
+Tips: excel type does not support any compression format
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
+
+### max_rows_in_memory [int]
+
+When File Format is Excel,The maximum number of data items that can be cached
in the memory.
+
+### sheet_name [string]
+
+Writer the sheet of the workbook
+
+## Example
+
+For text file format with `have_partition` and `custom_filename` and
`sink_columns`
+
+```hocon
+
+ CosFile {
+ path="/sink"
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ file_format_type = "text"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ have_partition = true
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ custom_filename = true
+ file_name_expression = "${transactionId}_${now}"
+ filename_time_format = "yyyy.MM.dd"
+ sink_columns = ["name","age"]
+ is_enable_transaction = true
+ }
+
+```
+
+For parquet file format with `have_partition` and `sink_columns`
+
+```hocon
+
+ CosFile {
+ path="/sink"
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ have_partition = true
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_format_type = "parquet"
+ sink_columns = ["name","age"]
+ }
+
+```
+
+For orc file format simple config
+
+```bash
+
+ CosFile {
+ path="/sink"
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ file_format_type = "orc"
+ }
+
+```
+
+## Changelog
+
+### next version
+
+- Add file cos sink connector
([4979](https://github.com/apache/seatunnel/pull/4979))
+
diff --git a/docs/en/connector-v2/source/CosFile.md
b/docs/en/connector-v2/source/CosFile.md
new file mode 100644
index 0000000000..18fc0299c9
--- /dev/null
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -0,0 +1,289 @@
+# CosFile
+
+> Cos file source connector
+
+## Description
+
+Read data from aliyun Cos file system.
+
+:::tip
+
+If you use spark/flink, In order to use this connector, You must ensure your
spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when
you download and install SeaTunnel Engine. You can check the jar package under
${SEATUNNEL_HOME}/lib to confirm this.
+
+To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar
and cos_api-bundle-{version}.jar in ${SEATUNNEL_HOME}/lib dir, download:
[Hadoop-Cos-release](https://github.com/tencentyun/hadoop-cos/releases). It
only supports hadoop version 2.6.5+ and version 8.0.2+.
+
+:::
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
+
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] file format type
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+ - [x] excel
+
+## Options
+
+| name | type | required | default value |
+|---------------------------|---------|----------|---------------------|
+| path | string | yes | - |
+| file_format_type | string | yes | - |
+| bucket | string | yes | - |
+| secret_id | string | yes | - |
+| secret_key | string | yes | - |
+| region | string | yes | - |
+| read_columns | list | yes | - |
+| delimiter | string | no | \001 |
+| parse_partition_from_path | boolean | no | true |
+| skip_header_row_number | long | no | 0 |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
+| sheet_name | string | no | - |
+
+### path [string]
+
+The source file path.
+
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when
reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path
`cosn://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name | age |
+|---------------|-----|
+| tyrantlucifer | 26 |
+
+Tips: **Do not define partition fields in schema option**
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date,
supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to
datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss`
`yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time,
supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
+### skip_header_row_number [long]
+
+Skip the first few lines, but only for the txt and csv.
+
+For example, set like following:
+
+`skip_header_row_number = 2`
+
+then SeaTunnel will skip the first 2 lines from source files
+
+### file_format_type [string]
+
+File type, supported as the following file types:
+
+`text` `csv` `parquet` `orc` `json` `excel`
+
+If you assign file type to `json`, you should also assign schema option to
tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+You can also save multiple pieces of data in one file and split them by
newline:
+
+```json lines
+
+{"code": 200, "data": "get success", "success": true}
+{"code": 300, "data": "get failed", "success": false}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+If you assign file type to `parquet` `orc`, schema option not required,
connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, you can choose to specify the schema
information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
+
+```
+
+If you do not assign data schema connector will treat the upstream data as the
following:
+
+| content |
+|-----------------------|
+| tyrantlucifer#26#male |
+
+If you assign data schema, you should also assign the option `delimiter` too
except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
+
+### bucket [string]
+
+The bucket address of Cos file system, for example:
`Cos://tyrantlucifer-image-bed`
+
+### secret_id [string]
+
+The secret id of Cos file system.
+
+### secret_key [string]
+
+The secret key of Cos file system.
+
+### region [string]
+
+The region of cos file system.
+
+### schema [config]
+
+#### fields [Config]
+
+The schema of upstream data.
+
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+- excel
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+
+### sheet_name [string]
+
+Reader the sheet of the workbook,Only used when file_format is excel.
+
+## Example
+
+```hocon
+
+ CosFile {
+ path = "/seatunnel/orc"
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ file_format_type = "orc"
+ }
+
+```
+
+```hocon
+
+ CosFile {
+ path = "/seatunnel/json"
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ file_format_type = "json"
+ schema {
+ fields {
+ id = int
+ name = string
+ }
+ }
+ }
+
+```
+
+## Changelog
+
+### next version
+
+- Add file cos source connector
([4979](https://github.com/apache/seatunnel/pull/4979))
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 3943159aef..a1c4e40fbb 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -49,6 +49,8 @@ seatunnel.source.OssFile = connector-file-oss
seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.OssJindoFile = connector-file-oss-jindo
seatunnel.sink.OssJindoFile = connector-file-oss-jindo
+seatunnel.source.CosFile = connector-file-cos
+seatunnel.sink.CosFile = connector-file-cos
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 8d50cee469..3d3965b7c3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -24,6 +24,7 @@ public enum FileSystemType implements Serializable {
LOCAL("LocalFile"),
OSS("OssFile"),
OSS_JINDO("OssJindoFile"),
+ COS("CosFile"),
FTP("FtpFile"),
SFTP("SftpFile"),
S3("S3File");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/pom.xml
b/seatunnel-connectors-v2/connector-file/connector-file-cos/pom.xml
new file mode 100644
index 0000000000..457357ad81
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-file-cos</artifactId>
+ <name>SeaTunnel : Connectors V2 : File : Cos</name>
+
+ <properties>
+ <hadoop-cos.version>2.6.5-8.0.2</hadoop-cos.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>hadoop-cos</artifactId>
+ <version>${hadoop-cos.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
new file mode 100644
index 0000000000..211c245368
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.fs.CosNConfigKeys;
+
+import java.util.HashMap;
+
+public class CosConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.CosFileSystem";
+ private static final String SCHEMA = "cosn";
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+
+ public CosConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ public static HadoopConf buildWithConfig(Config config) {
+ HadoopConf hadoopConf = new
CosConf(config.getString(CosConfig.BUCKET.key()));
+ HashMap<String, String> cosOptions = new HashMap<>();
+ cosOptions.put(
+ CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY,
+ config.getString(CosConfig.SECRET_ID.key()));
+ cosOptions.put(
+ CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY,
+ config.getString(CosConfig.SECRET_KEY.key()));
+ cosOptions.put(CosNConfigKeys.COSN_REGION_KEY,
config.getString(CosConfig.REGION.key()));
+ hadoopConf.setExtraOptions(cosOptions);
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConfig.java
new file mode 100644
index 0000000000..cbbd68ef7d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+
+public class CosConfig extends BaseSourceConfig {
+ public static final Option<String> SECRET_ID =
+ Options.key("secret_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("COS bucket secret id");
+ public static final Option<String> SECRET_KEY =
+ Options.key("secret_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("COS bucket secret key");
+ public static final Option<String> REGION =
+
Options.key("region").stringType().noDefaultValue().withDescription("COS
region");
+ public static final Option<String> BUCKET =
+
Options.key("bucket").stringType().noDefaultValue().withDescription("COS
bucket");
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
new file mode 100644
index 0000000000..bfc6fa4ff1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class CosFileSink extends BaseFileSink {
+ @Override
+ public String getPluginName() {
+ return FileSystemType.COS.getFileSystemPluginName();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ super.prepare(pluginConfig);
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig,
+ CosConfig.FILE_PATH.key(),
+ CosConfig.REGION.key(),
+ CosConfig.SECRET_ID.key(),
+ CosConfig.SECRET_KEY.key(),
+ CosConfig.BUCKET.key());
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
+ }
+ hadoopConf = CosConf.buildWithConfig(pluginConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
new file mode 100644
index 0000000000..9de5386bc6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class CosFileSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.COS.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CosConfig.FILE_PATH)
+ .required(CosConfig.BUCKET)
+ .required(CosConfig.SECRET_ID)
+ .required(CosConfig.SECRET_KEY)
+ .required(CosConfig.REGION)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSinkConfig.ROW_DELIMITER,
+ BaseSinkConfig.FIELD_DELIMITER,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
+ .conditional(
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.PARQUET,
+ BaseSinkConfig.PARQUET_COMPRESS)
+ .optional(BaseSinkConfig.CUSTOM_FILENAME)
+ .conditional(
+ BaseSinkConfig.CUSTOM_FILENAME,
+ true,
+ BaseSinkConfig.FILE_NAME_EXPRESSION,
+ BaseSinkConfig.FILENAME_TIME_FORMAT)
+ .optional(BaseSinkConfig.HAVE_PARTITION)
+ .conditional(
+ BaseSinkConfig.HAVE_PARTITION,
+ true,
+ BaseSinkConfig.PARTITION_BY,
+ BaseSinkConfig.PARTITION_DIR_EXPRESSION,
+ BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE)
+ .optional(BaseSinkConfig.SINK_COLUMNS)
+ .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION)
+ .optional(BaseSinkConfig.DATE_FORMAT)
+ .optional(BaseSinkConfig.DATETIME_FORMAT)
+ .optional(BaseSinkConfig.TIME_FORMAT)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
new file mode 100644
index 0000000000..aefc339121
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSource.class)
+public class CosFileSource extends BaseFileSource {
+ @Override
+ public String getPluginName() {
+ return FileSystemType.COS.getFileSystemPluginName();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig,
+ CosConfig.FILE_PATH.key(),
+ CosConfig.FILE_FORMAT_TYPE.key(),
+ CosConfig.SECRET_ID.key(),
+ CosConfig.SECRET_KEY.key(),
+ CosConfig.REGION.key(),
+ CosConfig.BUCKET.key());
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
+ }
+ readStrategy =
+
ReadStrategyFactory.of(pluginConfig.getString(CosConfig.FILE_FORMAT_TYPE.key()));
+ readStrategy.setPluginConfig(pluginConfig);
+ String path = pluginConfig.getString(CosConfig.FILE_PATH.key());
+ hadoopConf = CosConf.buildWithConfig(pluginConfig);
+ try {
+ filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
+ } catch (IOException e) {
+ String errorMsg = String.format("Get file list from this path [%s]
failed", path);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
+ }
+ // support user-defined schema
+ FileFormat fileFormat =
+ FileFormat.valueOf(
+
pluginConfig.getString(CosConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
+ // only json text csv type support user-defined schema now
+ if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ case EXCEL:
+ SeaTunnelRowType userDefinedSchema =
+
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
+ break;
+ case ORC:
+ case PARQUET:
+ throw new FileConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new FileConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ "SeaTunnel does not supported this file format");
+ }
+ } else {
+ try {
+ rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
+ } catch (FileConnectorException e) {
+ String errorMsg =
+ String.format("Get table schema from file [%s]
failed", filePaths.get(0));
+ throw new FileConnectorException(
+ CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
new file mode 100644
index 0000000000..d0b781f1a1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.cos.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+
+@AutoService(Factory.class)
+public class CosFileSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.OSS.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(CosConfig.FILE_PATH)
+ .required(CosConfig.BUCKET)
+ .required(CosConfig.SECRET_ID)
+ .required(CosConfig.SECRET_KEY)
+ .required(CosConfig.REGION)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
+ .conditional(
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ Arrays.asList(
+ FileFormat.TEXT, FileFormat.JSON,
FileFormat.EXCEL, FileFormat.CSV),
+ CatalogTableUtil.SCHEMA)
+ .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
+ .optional(BaseSourceConfig.DATE_FORMAT)
+ .optional(BaseSourceConfig.DATETIME_FORMAT)
+ .optional(BaseSourceConfig.TIME_FORMAT)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return CosFileSource.class;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/resources/services/org.apache.hadoop.fs.FileSystem
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/resources/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000000..b4ecb7e0c7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/resources/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.CosFileSystem
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/cos/CosFileFactoryTest.java
similarity index 59%
copy from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-cos/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/cos/CosFileFactoryTest.java
index 8d50cee469..6691f5b1f2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/cos/CosFileFactoryTest.java
@@ -15,26 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.cos;
-import java.io.Serializable;
+import
org.apache.seatunnel.connectors.seatunnel.file.cos.sink.CosFileSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.cos.source.CosFileSourceFactory;
-public enum FileSystemType implements Serializable {
- HDFS("HdfsFile"),
- LOCAL("LocalFile"),
- OSS("OssFile"),
- OSS_JINDO("OssJindoFile"),
- FTP("FtpFile"),
- SFTP("SftpFile"),
- S3("S3File");
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
- private final String fileSystemPluginName;
+public class CosFileFactoryTest {
- FileSystemType(String fileSystemPluginName) {
- this.fileSystemPluginName = fileSystemPluginName;
- }
-
- public String getFileSystemPluginName() {
- return fileSystemPluginName;
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new CosFileSourceFactory()).optionRule());
+ Assertions.assertNotNull((new CosFileSinkFactory()).optionRule());
}
}
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml
b/seatunnel-connectors-v2/connector-file/pom.xml
index d20e6296cb..4bdfa981ce 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -39,6 +39,7 @@
<module>connector-file-sftp</module>
<module>connector-file-s3</module>
<module>connector-file-jindo-oss</module>
+ <module>connector-file-cos</module>
</modules>
<properties>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index fa1ac63c0b..acc6a4fc32 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -278,6 +278,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-cos</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-ftp</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/pom.xml
new file mode 100644
index 0000000000..aa51e1cc82
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-file-cos-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : File Cos</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-cos</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/cos/CosFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/cos/CosFileIT.java
new file mode 100644
index 0000000000..aaa2c1a276
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/cos/CosFileIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.cos;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled
+public class CosFileIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testCosFileWriteAndRead(TestContainer container)
+ throws IOException, InterruptedException {
+ // test cos excel file
+ Container.ExecResult excelWriteResult =
+ container.executeJob("/excel/fake_to_cos_excel.conf");
+ Assertions.assertEquals(0, excelWriteResult.getExitCode(),
excelWriteResult.getStderr());
+ Container.ExecResult excelReadResult =
+ container.executeJob("/excel/cos_excel_to_assert.conf");
+ Assertions.assertEquals(0, excelReadResult.getExitCode(),
excelReadResult.getStderr());
+
+ // test cos text file
+ Container.ExecResult textWriteResult =
+ container.executeJob("/text/fake_to_cos_file_text.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Container.ExecResult textReadResult =
+ container.executeJob("/text/cos_file_text_to_assert.conf");
+ Assertions.assertEquals(0, textReadResult.getExitCode());
+
+ // test cos json file
+ Container.ExecResult jsonWriteResult =
+ container.executeJob("/json/fake_to_cos_file_json.conf");
+ Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+ Container.ExecResult jsonReadResult =
+ container.executeJob("/json/cos_file_json_to_assert.conf");
+ Assertions.assertEquals(0, jsonReadResult.getExitCode());
+
+ // test cos orc file
+ Container.ExecResult orcWriteResult =
+ container.executeJob("/orc/fake_to_cos_file_orc.conf");
+ Assertions.assertEquals(0, orcWriteResult.getExitCode());
+ Container.ExecResult orcReadResult =
+ container.executeJob("/orc/cos_file_orc_to_assert.conf");
+ Assertions.assertEquals(0, orcReadResult.getExitCode());
+
+ // test cos parquet file
+ Container.ExecResult parquetWriteResult =
+ container.executeJob("/parquet/fake_to_cos_file_parquet.conf");
+ Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+ Container.ExecResult parquetReadResult =
+
container.executeJob("/parquet/cos_file_parquet_to_assert.conf");
+ Assertions.assertEquals(0, parquetReadResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/cos_excel_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/cos_excel_to_assert.conf
new file mode 100644
index 0000000000..b71709318e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/cos_excel_to_assert.conf
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ path = "/read/excel"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ result_table_name = "fake"
+ file_format_type = excel
+ delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/fake_to_cos_excel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/fake_to_cos_excel.conf
new file mode 100644
index 0000000000..4c603f5633
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/excel/fake_to_cos_excel.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ CosFile {
+ path="/sink/execl"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "excel"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/cos_file_json_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/cos_file_json_to_assert.conf
new file mode 100644
index 0000000000..d88761799b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/cos_file_json_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ path = "/read/json"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/fake_to_cos_file_json.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/fake_to_cos_file_json.conf
new file mode 100644
index 0000000000..20f54863d6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/json/fake_to_cos_file_json.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ CosFile {
+ path="/sink/json"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "json"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/cos_file_orc_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/cos_file_orc_to_assert.conf
new file mode 100644
index 0000000000..1041997ed6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/cos_file_orc_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ path = "/read/orc"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ file_format_type = "orc"
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/fake_to_cos_file_orc.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/fake_to_cos_file_orc.conf
new file mode 100644
index 0000000000..879993b4ea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/orc/fake_to_cos_file_orc.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ CosFile {
+ path="/sink/orc"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "orc"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "zlib"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/cos_file_parquet_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/cos_file_parquet_to_assert.conf
new file mode 100644
index 0000000000..8bf9c171ce
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/cos_file_parquet_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ path = "/read/parquet"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ file_format_type = "parquet"
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/fake_to_cos_file_parquet.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/fake_to_cos_file_parquet.conf
new file mode 100644
index 0000000000..bb86e5f8b2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/parquet/fake_to_cos_file_parquet.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ CosFile {
+ path="/sink/parquet"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "parquet"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "gzip"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/cos_file_text_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/cos_file_text_to_assert.conf
new file mode 100644
index 0000000000..d53a046079
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/cos_file_text_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ path = "/read/text"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ file_format_type = "text"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/fake_to_cos_file_text.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/fake_to_cos_file_text.conf
new file mode 100644
index 0000000000..f93af2e212
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-cos-e2e/src/test/resources/text/fake_to_cos_file_text.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ CosFile {
+ path="/sink/text"
+ bucket = "cosn://seatunnel-test"
+ secret_id = "dummy"
+ secret_key = "dummy"
+ region = "ap-chengdu"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "lzo"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 65798fee10..8644b551b2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -36,6 +36,7 @@
<module>connector-influxdb-e2e</module>
<module>connector-amazondynamodb-e2e</module>
<module>connector-file-local-e2e</module>
+ <module>connector-file-cos-e2e</module>
<module>connector-file-sftp-e2e</module>
<module>connector-cassandra-e2e</module>
<module>connector-neo4j-e2e</module>