This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c1401787b3 [Feature][Connector-V2] Supports the transfer of any file
(#6826)
c1401787b3 is described below
commit c1401787b3da7482cf7019c34d79faf730299c2e
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jun 4 20:10:54 2024 +0800
[Feature][Connector-V2] Supports the transfer of any file (#6826)
---
docs/en/connector-v2/sink/CosFile.md | 3 +-
docs/en/connector-v2/sink/FtpFile.md | 3 +-
docs/en/connector-v2/sink/HdfsFile.md | 3 +-
docs/en/connector-v2/sink/LocalFile.md | 3 +-
docs/en/connector-v2/sink/OssFile.md | 3 +-
docs/en/connector-v2/sink/OssJindoFile.md | 3 +-
docs/en/connector-v2/sink/S3File.md | 3 +-
docs/en/connector-v2/sink/SftpFile.md | 3 +-
docs/en/connector-v2/source/CosFile.md | 41 ++++++-
docs/en/connector-v2/source/FtpFile.md | 41 ++++++-
docs/en/connector-v2/source/HdfsFile.md | 3 +-
docs/en/connector-v2/source/LocalFile.md | 33 +++++-
docs/en/connector-v2/source/OssFile.md | 3 +-
docs/en/connector-v2/source/OssJindoFile.md | 41 ++++++-
docs/en/connector-v2/source/S3File.md | 3 +-
docs/en/connector-v2/source/SftpFile.md | 8 +-
.../file/hdfs/source/BaseHdfsFileSource.java | 3 +-
.../file/config/BaseFileSourceConfig.java | 1 +
.../seatunnel/file/config/FileFormat.java | 13 +++
.../file/exception/FileConnectorErrorCode.java | 2 +-
.../file/sink/writer/BinaryWriteStrategy.java | 128 +++++++++++++++++++++
.../file/source/reader/BinaryReadStrategy.java | 100 ++++++++++++++++
.../seatunnel/file/cos/source/CosFileSource.java | 3 +-
.../seatunnel/file/ftp/source/FtpFileSource.java | 3 +-
.../file/oss/jindo/source/OssFileSource.java | 3 +-
.../seatunnel/file/sftp/source/SftpFileSource.java | 3 +-
.../e2e/connector/file/local/LocalFileIT.java | 12 ++
.../src/test/resources/binary/cat.png | Bin 0 -> 1969877 bytes
.../binary/local_file_binary_to_assert.conf | 40 +++++++
.../local_file_binary_to_local_file_binary.conf | 34 ++++++
30 files changed, 520 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/sink/CosFile.md
b/docs/en/connector-v2/sink/CosFile.md
index 7d2733e905..4bf812666e 100644
--- a/docs/en/connector-v2/sink/CosFile.md
+++ b/docs/en/connector-v2/sink/CosFile.md
@@ -30,6 +30,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Options
@@ -115,7 +116,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format's suffix,
the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index 8757fd7aa2..3b429dfe04 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -28,6 +28,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Options
@@ -120,7 +121,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 319b9dde14..d138089b2e 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -22,6 +22,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
- [x] compress codec
- [x] lzo
@@ -46,7 +47,7 @@ Output data to hdfs file
| custom_filename | boolean | no | false
| Whether you need custom the filename
[...]
| file_name_expression | string | no | "${transactionId}"
| Only used when `custom_filename` is
`true`.`file_name_expression` describes the file expression which will be
created into the `path`. We can add the variable `${now}` or `${uuid}` in the
`file_name_expression`, like `test_${uuid}_${now}`,`${now}` represents the
current time, and its format can be defined by specifying the option
`filename_time_format`.Please note that, If `is_enable_tr [...]
| filename_time_format | string | no | "yyyy.MM.dd"
| Only used when `custom_filename` is `true`.When the
format in the `file_name_expression` parameter is `xxxx-${now}` ,
`filename_time_format` can specify the time format of the path, and the default
value is `yyyy.MM.dd` . The commonly used time formats are listed as
follows:[y:Year,M:Month,d:Day of month,H:Hour in day (0-23),m:Minute in
hour,s:Second in minute] [...]
-| file_format_type | string | no | "csv"
| We supported as the following file types:`text` `json`
`csv` `orc` `parquet` `excel` `xml`.Please note that, The final file name will
end with the file_format's suffix, the suffix of the text file is `txt`.
[...]
+| file_format_type | string | no | "csv"
| We supported as the following file types:`text` `csv`
`parquet` `orc` `json` `excel` `xml` `binary`.Please note that, The final file
name will end with the file_format's suffix, the suffix of the text file is
`txt`.
[...]
| field_delimiter | string | no | '\001'
| Only used when file_format is text,The separator
between columns in a row of data. Only needed by `text` file format.
[...]
| row_delimiter | string | no | "\n"
| Only used when file_format is text,The separator
between rows in a file. Only needed by `text` file format.
[...]
| have_partition | boolean | no | false
| Whether you need processing partitions.
[...]
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index 33aae02c66..d2ab6ea8e1 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -28,6 +28,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Options
@@ -94,7 +95,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
index 182c138fba..41dbead736 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -33,6 +33,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Data Type Mapping
@@ -166,7 +167,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${Now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md
b/docs/en/connector-v2/sink/OssJindoFile.md
index f1ca20ab5c..cab0455613 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -34,6 +34,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Options
@@ -119,7 +120,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/S3File.md
b/docs/en/connector-v2/sink/S3File.md
index 508524c511..b59bc078e3 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -23,6 +23,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -172,7 +173,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/sink/SftpFile.md
b/docs/en/connector-v2/sink/SftpFile.md
index 9169a79b2a..bba650f6a1 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -28,6 +28,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Options
@@ -113,7 +114,7 @@ When the format in the `file_name_expression` parameter is
`xxxx-${now}` , `file
We supported as the following file types:
-`text` `json` `csv` `orc` `parquet` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
Please note that, The final file name will end with the file_format_type's
suffix, the suffix of the text file is `txt`.
diff --git a/docs/en/connector-v2/source/CosFile.md
b/docs/en/connector-v2/source/CosFile.md
index 973ad8b029..05efe7b9fd 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -27,6 +27,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -76,7 +77,7 @@ The source file path.
File type, supported as the following file types:
-`text` `csv` `parquet` `orc` `json` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
If you assign file type to `json`, you should also assign schema option to
tell connector how to parse data to the row you want.
@@ -160,6 +161,11 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+If you assign file type to `binary`, SeaTunnel can synchronize files in any
format,
+such as compressed packages, pictures, etc. In short, any files can be
synchronized to the target place.
+Under this requirement, you need to ensure that the source and sink use
`binary` format for file synchronization
+at the same time. You can find the specific usage in the example below.
+
### bucket [string]
The bucket address of Cos file system, for example:
`Cos://tyrantlucifer-image-bed`
@@ -321,6 +327,39 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
```
+### Transfer Binary File
+
+```hocon
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ CosFile {
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ // you can transfer local file to s3/hdfs/oss etc.
+ CosFile {
+ bucket = "cosn://seatunnel-test-1259587829"
+ secret_id = "xxxxxxxxxxxxxxxxxxx"
+ secret_key = "xxxxxxxxxxxxxxxxxxx"
+ region = "ap-chengdu"
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
+
+```
+
## Changelog
### next version
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index e3df8d2059..e4cff24fc8 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -22,6 +22,7 @@
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -86,7 +87,7 @@ The source file path.
File type, supported as the following file types:
-`text` `csv` `parquet` `orc` `json` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
If you assign file type to `json` , you should also assign schema option to
tell connector how to parse data to the row you want.
@@ -159,6 +160,11 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+If you assign file type to `binary`, SeaTunnel can synchronize files in any
format,
+such as compressed packages, pictures, etc. In short, any files can be
synchronized to the target place.
+Under this requirement, you need to ensure that the source and sink use
`binary` format for file synchronization
+at the same time. You can find the specific usage in the example below.
+
### connection_mode [string]
The target ftp connection mode , default is active mode, supported as the
following modes:
@@ -288,6 +294,39 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
```
+### Transfer Binary File
+
+```hocon
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ // you can transfer local file to s3/hdfs/oss etc.
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index 99f70b7694..c37f3fb121 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -27,6 +27,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -43,7 +44,7 @@ Read data from hdfs file system.
| Name | Type | Required | Default |
Description
|
|---------------------------|---------|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string | yes | - | The
source file path.
|
-| file_format_type | string | yes | - | We
supported as the following file types:`text` `json` `csv` `orc` `parquet`
`excel` `xml`.Please note that, The final file name will end with the
file_format's suffix, the suffix of the text file is `txt`.
|
+| file_format_type | string | yes | - | We
supported as the following file types:`text` `csv` `parquet` `orc` `json`
`excel` `xml` `binary`.Please note that, The final file name will end with the
file_format's suffix, the suffix of the text file is `txt`.
|
| fs.defaultFS | string | yes | - | The
hadoop cluster address that start with `hdfs://`, for example:
`hdfs://hadoopcluster`
|
| read_columns | list | yes | - | The
read column list of the data source, user can use it to implement field
projection.The file type supported column projection as the following
shown:[text,json,csv,orc,parquet,excel,xml].Tips: If the user wants to use this
feature when reading `text` `json` `csv` files, the schema option must be
configured. |
| hdfs_site_path | string | no | - | The
path of `hdfs-site.xml`, used to load ha configuration of namenodes
|
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index dc7efc3c13..05d87362f6 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -27,6 +27,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -71,7 +72,7 @@ The source file path.
File type, supported as the following file types:
-`text` `csv` `parquet` `orc` `json` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
If you assign file type to `json`, you should also assign schema option to
tell connector how to parse data to the row you want.
@@ -155,6 +156,11 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+If you assign file type to `binary`, SeaTunnel can synchronize files in any
format,
+such as compressed packages, pictures, etc. In short, any files can be
synchronized to the target place.
+Under this requirement, you need to ensure that the source and sink use
`binary` format for file synchronization
+at the same time. You can find the specific usage in the example below.
+
### read_columns [list]
The read column list of the data source, user can use it to implement field
projection.
@@ -363,6 +369,31 @@ LocalFile {
```
+### Transfer Binary File
+
+```hocon
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ // you can transfer local file to s3/hdfs/oss etc.
+ LocalFile {
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index 62ecf19c3b..3f781eb11a 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -38,6 +38,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Data Type Mapping
@@ -192,7 +193,7 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| name | type | required | default value |
Description
|
|---------------------------|---------|----------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string | yes | - | The
Oss path that needs to be read can have sub paths, but the sub paths need to
meet certain format requirements. Specific requirements can be referred to
"parse_partition_from_path" option
|
-| file_format_type | string | yes | - | File
type, supported as the following file types: `text` `csv` `parquet` `orc`
`json` `excel` `xml`
|
+| file_format_type | string | yes | - | File
type, supported as the following file types: `text` `csv` `parquet` `orc`
`json` `excel` `xml` `binary`
|
| bucket | string | yes | - | The
bucket address of oss file system, for example: `oss://seatunnel-test`.
|
| endpoint | string | yes | - | fs
oss endpoint
|
| read_columns | list | no | - | The
read column list of the data source, user can use it to implement field
projection. The file type supported column projection as the following shown:
`text` `csv` `parquet` `orc` `json` `excel` `xml` . If the user wants to use
this feature when reading `text` `json` `csv` files, the "schema" option must
be configured. |
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index 3c1847608c..f24ea83f0c 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -27,6 +27,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -80,7 +81,7 @@ The source file path.
File type, supported as the following file types:
-`text` `csv` `parquet` `orc` `json` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
If you assign file type to `json`, you should also assign schema option to
tell connector how to parse data to the row you want.
@@ -164,6 +165,11 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+If you assign file type to `binary`, SeaTunnel can synchronize files in any
format,
+such as compressed packages, pictures, etc. In short, any files can be
synchronized to the target place.
+Under this requirement, you need to ensure that the source and sink use
`binary` format for file synchronization
+at the same time. You can find the specific usage in the example below.
+
### bucket [string]
The bucket address of oss file system, for example:
`oss://tyrantlucifer-image-bed`
@@ -313,6 +319,39 @@ OssJindoFile {
```
+### Transfer Binary File
+
+```hocon
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ OssJindoFile {
+ bucket = "oss://tyrantlucifer-image-bed"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "oss-cn-beijing.aliyuncs.com"
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ // you can transfer local file to s3/hdfs/oss etc.
+ OssJindoFile {
+ bucket = "oss://tyrantlucifer-image-bed"
+ access_key = "xxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+ endpoint = "oss-cn-beijing.aliyuncs.com"
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
+
+```
+
## Changelog
### next version
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index 40c4e334fd..bf621be590 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -27,6 +27,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -198,7 +199,7 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| name | type | required |
default value |
Description
[...]
|---------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| path | string | yes | -
| The s3 path that needs to be read can have
sub paths, but the sub paths need to meet certain format requirements. Specific
requirements can be referred to "parse_partition_from_path" option
[...]
-| file_format_type | string | yes | -
| File type, supported as the following file
types: `text` `csv` `parquet` `orc` `json` `excel` `xml`
[...]
+| file_format_type | string | yes | -
| File type, supported as the following file
types: `text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
[...]
| bucket | string | yes | -
| The bucket address of s3 file system, for
example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter
should be `s3a://seatunnel-test`.
[...]
| fs.s3a.endpoint | string | yes | -
| fs s3a endpoint
[...]
| fs.s3a.aws.credentials.provider | string | yes |
com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate
s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider`
and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. More
information about the credential provider you can see [Hadoop AWS
Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredenti
[...]
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index a5a4f22168..dafdb6aba9 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -22,6 +22,7 @@
- [x] json
- [x] excel
- [x] xml
+ - [x] binary
## Description
@@ -97,7 +98,7 @@ The File does not have a specific type list, and we can
indicate which SeaTunnel
### file_format_type [string]
File type, supported as the following file types:
-`text` `csv` `parquet` `orc` `json` `excel` `xml`
+`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary`
If you assign file type to `json`, you should also assign schema option to
tell connector how to parse data to the row you want.
For example:
upstream data is the following:
@@ -160,6 +161,11 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+If you assign file type to `binary`, SeaTunnel can synchronize files in any
format,
+such as compressed packages, pictures, etc. In short, any files can be
synchronized to the target place.
+Under this requirement, you need to ensure that the source and sink use
`binary` format for file synchronization
+at the same time.
+
### compress_codec [string]
The compress codec of files and the details that supported as the following
shown:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 78f3147b17..9af2721e22 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -116,9 +116,10 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
break;
case ORC:
case PARQUET:
+ case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
default:
// never got in there
throw new FileConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index 5c16a7e28b..373ada564a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -99,6 +99,7 @@ public abstract class BaseFileSourceConfig implements
Serializable {
return newCatalogTable(catalogTable,
readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
+ case BINARY:
return newCatalogTable(
catalogTable,
readStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 52465fa48a..819dc03e87 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.BinaryWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ExcelWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
@@ -25,6 +26,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteSt
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.XmlWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
@@ -113,6 +115,17 @@ public enum FileFormat implements Serializable {
public ReadStrategy getReadStrategy() {
return new XmlReadStrategy();
}
+ },
+ BINARY("") {
+ @Override
+ public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
+ return new BinaryWriteStrategy(fileSinkConfig);
+ }
+
+ @Override
+ public ReadStrategy getReadStrategy() {
+ return new BinaryReadStrategy();
+ }
};
private final String suffix;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index 27dca4a6bc..f5847aff14 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -28,7 +28,7 @@ public enum FileConnectorErrorCode implements
SeaTunnelErrorCode {
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
FILE_READ_FAILED("FILE-08", "File read failed"),
- ;
+ BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order
abnormality");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
new file mode 100644
index 0000000000..7f496b2927
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import lombok.NonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+
+public class BinaryWriteStrategy extends AbstractWriteStrategy {
+
+ private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
+ private final LinkedHashMap<String, Long> partIndexMap;
+
+ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) {
+ super(fileSinkConfig);
+ this.beingWrittenOutputStream = new LinkedHashMap<>();
+ this.partIndexMap = new LinkedHashMap<>();
+ }
+
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
+ "BinaryWriteStrategy only supports binary format, please
read file with `BINARY` format, and do not change schema in the transform.");
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException
{
+ byte[] data = (byte[]) seaTunnelRow.getField(0);
+ String relativePath = (String) seaTunnelRow.getField(1);
+ long partIndex = (long) seaTunnelRow.getField(2);
+ String filePath = getOrCreateFilePathBeingWritten(relativePath);
+ FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
+ if (partIndex - 1 != partIndexMap.get(filePath)) {
+ throw new FileConnectorException(
+ FileConnectorErrorCode.BINARY_FILE_PART_ORDER_ERROR,
+ "Last order is " + partIndexMap.get(filePath) + ", but get
" + partIndex);
+ } else {
+ partIndexMap.put(filePath, partIndex);
+ }
+ try {
+ fsDataOutputStream.write(data);
+ } catch (IOException e) {
+ throw CommonError.fileOperationFailed("BinaryFile", "write",
filePath, e);
+ }
+ }
+
+ public String getOrCreateFilePathBeingWritten(String relativePath) {
+ String beingWrittenFilePath = beingWrittenFile.get(relativePath);
+ if (beingWrittenFilePath != null) {
+ return beingWrittenFilePath;
+ } else {
+ String[] pathSegments = new String[] {transactionDirectory,
relativePath};
+ String newBeingWrittenFilePath = String.join(File.separator,
pathSegments);
+ beingWrittenFile.put(relativePath, newBeingWrittenFilePath);
+ return newBeingWrittenFilePath;
+ }
+ }
+
+ private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fsDataOutputStream == null) {
+ try {
+ fsDataOutputStream =
hadoopFileSystemProxy.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+ partIndexMap.put(filePath, -1L);
+ } catch (IOException e) {
+ throw CommonError.fileOperationFailed("BinaryFile", "open",
filePath, e);
+ }
+ }
+ return fsDataOutputStream;
+ }
+
+ @Override
+ public void finishAndCloseFile() {
+ beingWrittenOutputStream.forEach(
+ (key, value) -> {
+ try {
+ value.flush();
+ } catch (IOException e) {
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.FLUSH_DATA_FAILED,
+ String.format("Flush data to this file [%s]
failed", key),
+ e);
+ } finally {
+ try {
+ value.close();
+ } catch (IOException e) {
+ log.error("error when close output stream {}",
key, e);
+ }
+ }
+ needMoveFiles.put(key, getTargetLocation(key));
+ });
+ beingWrittenOutputStream.clear();
+ partIndexMap.clear();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
new file mode 100644
index 0000000000..3bbb90c774
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+/** Used to read file to binary stream */
+public class BinaryReadStrategy extends AbstractReadStrategy {
+
+ public static SeaTunnelRowType binaryRowType =
+ new SeaTunnelRowType(
+ new String[] {"data", "relativePath", "partIndex"},
+ new SeaTunnelDataType[] {
+ PrimitiveByteArrayType.INSTANCE,
BasicType.STRING_TYPE, BasicType.LONG_TYPE
+ });
+
+ private File basePath;
+
+ @Override
+ public void init(HadoopConf conf) {
+ super.init(conf);
+ basePath = new
File(pluginConfig.getString(BaseSourceConfigOptions.FILE_PATH.key()));
+ }
+
+ @Override
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
+ throws IOException, FileConnectorException {
+ try (InputStream inputStream =
hadoopFileSystemProxy.getInputStream(path)) {
+ String relativePath;
+ if (basePath.isFile()) {
+ relativePath = basePath.getName();
+ } else {
+ relativePath =
+ path.substring(
+ path.indexOf(basePath.getAbsolutePath())
+ + basePath.getAbsolutePath().length());
+ if (relativePath.startsWith(File.separator)) {
+ relativePath =
relativePath.substring(File.separator.length());
+ }
+ }
+ // TODO config this size
+ int maxSize = 1024;
+ byte[] buffer = new byte[maxSize];
+ long partIndex = 0;
+ int readSize;
+ while ((readSize = inputStream.read(buffer)) != -1) {
+ if (readSize != maxSize) {
+ buffer = Arrays.copyOf(buffer, readSize);
+ }
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer,
relativePath, partIndex});
+ buffer = new byte[1024];
+ output.collect(row);
+ partIndex++;
+ }
+ }
+ }
+
+ /**
+ * Returns a fixed SeaTunnelRowType used to store file fragments.
+ *
+ * <p>`data`: Holds the binary data of the file fragment. When the data is
empty, it indicates
+ * the end of the file.
+ *
+ * <p>`relativePath`: Represents the sub-path of the file.
+ *
+ * <p>`partIndex`: Indicates the order of the file fragment.
+ */
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException {
+ return binaryRowType;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
index 476a3878fe..0690b2aceb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -102,9 +102,10 @@ public class CosFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
+ case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
default:
// never got in there
throw new FileConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 19ea0c0ba0..b032717cab 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -107,9 +107,10 @@ public class FtpFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
+ case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
default:
// never got in there
throw new FileConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
index 6eea28eda3..335e396780 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
@@ -103,9 +103,10 @@ public class OssFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
+ case BINARY:
throw new OssJindoConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
default:
// never got in there
throw new OssJindoConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index 0ccee6c281..d20823adc1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -107,9 +107,10 @@ public class SftpFileSource extends BaseFileSource {
break;
case ORC:
case PARQUET:
+ case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
+ "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
default:
// never got in there
throw new FileConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index 46213dde0f..51494b5059 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.connector.file.local;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.container.TestHelper;
@@ -106,6 +107,9 @@ public class LocalFileIT extends TestSuiteBase {
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
container);
+ ContainerUtil.copyFileIntoContainers(
+ "/binary/cat.png", "/seatunnel/read/binary/cat.png",
container);
+
ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
@@ -167,6 +171,14 @@ public class LocalFileIT extends TestSuiteBase {
// test read empty directory
helper.execute("/json/local_file_to_console.conf");
helper.execute("/parquet/local_file_to_console.conf");
+
+ // test binary file
+ helper.execute("/binary/local_file_binary_to_local_file_binary.conf");
+ if (!container.identifier().getEngineType().equals(EngineType.FLINK)) {
+ // the file generated by local_file_binary_to_local_file_binary in
taskManager, so read
+ // from jobManager will be failed in Flink
+ helper.execute("/binary/local_file_binary_to_assert.conf");
+ }
}
private Path convertToLzoFile(File file) throws IOException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/cat.png
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/cat.png
new file mode 100644
index 0000000000..fb39446a11
Binary files /dev/null and
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/cat.png
differ
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_assert.conf
new file mode 100644
index 0000000000..c66d53f280
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_assert.conf
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1924
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary.conf
new file mode 100644
index 0000000000..05337374c2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_to_local_file_binary.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+ }
+}
+sink {
+ LocalFile {
+ path = "/seatunnel/read/binary2/"
+ file_format_type = "binary"
+ }
+}
\ No newline at end of file