This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 40097d7f3e [Doc][Connector-V2] Update save mode config for OssFileSink
(#9303)
40097d7f3e is described below
commit 40097d7f3e3f4eb73285eff79ff70dce8430bab7
Author: Zzih <[email protected]>
AuthorDate: Tue May 13 17:27:46 2025 +0800
[Doc][Connector-V2] Update save mode config for OssFileSink (#9303)
---
docs/en/connector-v2/sink/OssFile.md | 29 +++-
docs/zh/connector-v2/sink/OssFile.md | 175 ++++++++++++---------
.../file/oss/sink/OssFileSinkFactory.java | 2 +
3 files changed, 131 insertions(+), 75 deletions(-)
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
index 6974118065..016e2a6a7c 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -102,7 +102,7 @@ If write to `csv`, `text`, `json` file type, All column
will be string.
| file_format_type | string | no | "csv"
|
|
| filename_extension | string | no | -
| Override the default file name extensions with
custom file name extensions. E.g. `.xml`, `.json`, `dat`, `.customtype`
|
| field_delimiter | string | no | '\001'
| Only used when file_format_type is text
|
-| row_delimiter | string | no | "\n"
| Only used when file_format_type is `text`, `csv`
and `json`
|
+| row_delimiter | string | no | "\n"
| Only used when file_format_type is `text`, `csv`
and `json`
|
| have_partition | boolean | no | false
| Whether you need processing partitions.
|
| partition_by | array | no | -
| Only used then have_partition is true
|
| partition_dir_expression | string | no |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is
true
|
@@ -124,6 +124,8 @@ If write to `csv`, `text`, `json` file type, All column
will be string.
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
|
| encoding | string | no | "UTF-8"
| Only used when file_format_type is
json,text,csv,xml.
|
+| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | Before turning on the synchronous
task, do different treatment of the target path
|
+| data_save_mode | Enum | no | APPEND_DATA
| Before opening the synchronous task, the data file
in the target path is differently processed
|
### path [string]
@@ -289,6 +291,23 @@ Support writing Parquet INT96 from a 12-byte field, only
valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by
`Charset.forName(encoding)`.
+### schema_save_mode[Enum]
+
+Before turning on the synchronous task, do different treatment of the target
path.
+Option introduction:
+`RECREATE_SCHEMA` :Will be created when the path does not exist. If the path
already exists, delete the path and recreate it.
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the path does not exist, use
the path when the path is existed.
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not
exist
+`IGNORE` :Ignore the treatment of the table
+
+### data_save_mode[Enum]
+
+Before opening the synchronous task, the data file in the target path is
differently processed.
+Option introduction:
+`DROP_DATA`: use the path but delete data files in the path.
+`APPEND_DATA`:use the path, and add new files in the path for write data.
+`ERROR_WHEN_DATA_EXISTS`:When there are some data files in the path, an error
will is reported.
+
## How to Create an Oss Data Synchronization Jobs
The following example demonstrates how to create a data synchronization job
that reads data from Fake Source and writes
@@ -335,6 +354,8 @@ sink {
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -374,6 +395,8 @@ sink {
is_partition_field_write_in_file = true
file_format_type = "parquet"
sink_columns = ["name","age"]
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -408,6 +431,8 @@ sink {
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -529,6 +554,8 @@ sink {
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "lzo"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
diff --git a/docs/zh/connector-v2/sink/OssFile.md
b/docs/zh/connector-v2/sink/OssFile.md
index 9ca9827ca4..e9f056cd43 100644
--- a/docs/zh/connector-v2/sink/OssFile.md
+++ b/docs/zh/connector-v2/sink/OssFile.md
@@ -44,86 +44,88 @@ import ChangeLog from '../changelog/connector-file-oss.md';
### Orc 文件类型
-| SeaTunnel 数据类型 | Orc 数据类型 |
-|----------------------|-----------------------|
-| STRING | STRING |
-| BOOLEAN | BOOLEAN |
-| TINYINT | BYTE |
-| SMALLINT | SHORT |
-| INT | INT |
-| BIGINT | LONG |
-| FLOAT | FLOAT |
-| FLOAT | FLOAT |
-| DOUBLE | DOUBLE |
-| DECIMAL | DECIMAL |
-| BYTES | BINARY |
-| DATE | DATE |
-| TIME <br/> TIMESTAMP | TIMESTAMP |
-| ROW | STRUCT |
-| NULL | 不支持的数据类型 |
-| ARRAY | LIST |
-| Map | Map |
+| SeaTunnel 数据类型 | Orc 数据类型 |
+|----------------------|-----------|
+| STRING | STRING |
+| BOOLEAN | BOOLEAN |
+| TINYINT | BYTE |
+| SMALLINT | SHORT |
+| INT | INT |
+| BIGINT | LONG |
+| FLOAT | FLOAT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DECIMAL | DECIMAL |
+| BYTES | BINARY |
+| DATE | DATE |
+| TIME <br/> TIMESTAMP | TIMESTAMP |
+| ROW | STRUCT |
+| NULL | 不支持的数据类型 |
+| ARRAY | LIST |
+| Map | Map |
### Parquet 文件类型
-| SeaTunnel 数据类型 | Parquet 数据类型 |
-|----------------------|-----------------------|
-| STRING | STRING |
-| BOOLEAN | BOOLEAN |
-| TINYINT | INT_8 |
-| SMALLINT | INT_16 |
-| INT | INT32 |
-| BIGINT | INT64 |
-| FLOAT | FLOAT |
-| FLOAT | FLOAT |
-| DOUBLE | DOUBLE |
-| DECIMAL | DECIMAL |
-| BYTES | BINARY |
-| DATE | DATE |
-| TIME <br/> TIMESTAMP | TIMESTAMP_MILLIS |
-| ROW | GroupType |
-| NULL | 不支持的数据类型 |
-| ARRAY | LIST |
-| Map | Map |
+| SeaTunnel 数据类型 | Parquet 数据类型 |
+|----------------------|------------------|
+| STRING | STRING |
+| BOOLEAN | BOOLEAN |
+| TINYINT | INT_8 |
+| SMALLINT | INT_16 |
+| INT | INT32 |
+| BIGINT | INT64 |
+| FLOAT | FLOAT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DECIMAL | DECIMAL |
+| BYTES | BINARY |
+| DATE | DATE |
+| TIME <br/> TIMESTAMP | TIMESTAMP_MILLIS |
+| ROW | GroupType |
+| NULL | 不支持的数据类型 |
+| ARRAY | LIST |
+| Map | Map |
## 选项
-| 名称 | 类型 | 必需 | 默认值
| 描述 |
-|---------------------------------------|---------|----------|--------------------------------------------|-------------------------------------------------------|
-| path | string | 是 | 写入文件的oss路径。
| |
-| tmp_path | string | 否 | /tmp/seatunnel
|
结果文件将首先写入tmp路径,然后使用`mv`将tmp-dir提交到目标dir。因此需要一个OSS目录。 |
-| bucket | string | 是 | -
|
|
-| access_key | string | 是 | -
|
|
-| access_secret | string | 是 | -
|
|
-| endpoint | string | 是 | -
|
|
-| custom_filename | boolean | 否 | false
| 是否需要自定义文件名
|
-| file_name_expression | string | 否 |
"${transactionId}" | 仅在custom_filename为true时使用
|
-| filename_time_format | string | 否 | "yyyy.MM.dd"
| 仅在custom_filename为true时使用
|
-| file_format_type | string | 否 | "csv"
|
|
-| field_delimiter | string | 否 | '\001'
| 仅当file_format_type为文本时使用
|
-| row_delimiter | string | 否 | "\n"
| 仅当file_format_type为 `text`、`csv`、`json` 时使用
|
-| have_partition | boolean | 否 | false
| 是否需要处理分区。
|
-| partition_by | array | 否 | -
| 只有在have_partition为true时才使用
|
-| partition_dir_expression | string | 否 |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为true时才使用
|
-| is_partition_field_write_in_file | boolean | 否 | false
| 只有在have_partition为true时才使用
|
-| sink_columns | array | 否 |
| 当此参数为空时,所有字段都是接收列
|
-| is_enable_transaction | boolean | 否 | true
|
|
-| batch_size | int | 否 | 1000000
|
|
-| compress_codec | string | 否 | none
|
|
-| common-options | object | 否 | -
|
|
-| max_rows_in_memory | int | 否 | -
| 仅当file_format_type为excel时使用。
|
-| sheet_name | string | 否 | Sheet${Random
number} | 仅当file_format_type为excel时使用。
|
-| csv_string_quote_mode | enum | 否 | MINIMAL
| 仅在file_format为csv时使用。
|
-| xml_root_tag | string | 否 | RECORDS
| 仅在file_format为xml时使用。
|
-| xml_row_tag | string | 否 | RECORD
| 仅在file_format为xml时使用。
|
-| xml_use_attr_format | boolean | 否 | -
| 仅在file_format为xml时使用。
|
-| single_file_mode | boolean | 否 | false
| 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。
|
-| create_empty_file_when_no_data | boolean | 否 | false
| 当上游没有数据同步时,仍然会生成相应的数据文件。
|
-| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false
| 仅在file_format为parquet时使用。
|
-| parquet_avro_write_fixed_as_int96 | array | 否 | -
| 仅在file_format为parquet时使用。
|
-| enable_header_write | boolean | 否 | false
|
仅当file_format_type为文本、csv时使用<br/>false:不写标头,true:写标头。 |
-| encoding | string | 否 | "UTF-8"
| 仅当file_format_type为json、text、csv、xml时使用。
|
+| 名称 | 类型 | 必需 | 默认值
| 描述 |
+|---------------------------------------|---------|----|--------------------------------------------|-------------------------------------------------------|
+| path | string | 是 | 写入文件的oss路径。
| |
+| tmp_path | string | 否 | /tmp/seatunnel
| 结果文件将首先写入tmp路径,然后使用`mv`将tmp-dir提交到目标dir。因此需要一个OSS目录。 |
+| bucket | string | 是 | -
| |
+| access_key | string | 是 | -
| |
+| access_secret | string | 是 | -
| |
+| endpoint | string | 是 | -
| |
+| custom_filename | boolean | 否 | false
| 是否需要自定义文件名 |
+| file_name_expression | string | 否 | "${transactionId}"
| 仅在custom_filename为true时使用 |
+| filename_time_format | string | 否 | "yyyy.MM.dd"
| 仅在custom_filename为true时使用 |
+| file_format_type | string | 否 | "csv"
| |
+| field_delimiter | string | 否 | '\001'
| 仅当file_format_type为文本时使用 |
+| row_delimiter | string | 否 | "\n"
| 仅当file_format_type为 `text`、`csv`、`json` 时使用 |
+| have_partition | boolean | 否 | false
| 是否需要处理分区。 |
+| partition_by | array | 否 | -
| 只有在have_partition为true时才使用 |
+| partition_dir_expression | string | 否 |
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为true时才使用
|
+| is_partition_field_write_in_file | boolean | 否 | false
| 只有在have_partition为true时才使用 |
+| sink_columns | array | 否 |
| 当此参数为空时,所有字段都是接收列 |
+| is_enable_transaction | boolean | 否 | true
| |
+| batch_size | int | 否 | 1000000
| |
+| compress_codec | string | 否 | none
| |
+| common-options | object | 否 | -
| |
+| max_rows_in_memory | int | 否 | -
| 仅当file_format_type为excel时使用。 |
+| sheet_name | string | 否 | Sheet${Random number}
| 仅当file_format_type为excel时使用。 |
+| csv_string_quote_mode | enum | 否 | MINIMAL
| 仅在file_format为csv时使用。 |
+| xml_root_tag | string | 否 | RECORDS
| 仅在file_format为xml时使用。 |
+| xml_row_tag | string | 否 | RECORD
| 仅在file_format为xml时使用。 |
+| xml_use_attr_format | boolean | 否 | -
| 仅在file_format为xml时使用。 |
+| single_file_mode | boolean | 否 | false
| 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。 |
+| create_empty_file_when_no_data | boolean | 否 | false
| 当上游没有数据同步时,仍然会生成相应的数据文件。 |
+| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false
| 仅在file_format为parquet时使用。 |
+| parquet_avro_write_fixed_as_int96 | array | 否 | -
| 仅在file_format为parquet时使用。 |
+| enable_header_write | boolean | 否 | false
| 仅当file_format_type为文本、csv时使用<br/>false:不写标头,true:写标头。 |
+| encoding | string | 否 | "UTF-8"
| 仅当file_format_type为json、text、csv、xml时使用。 |
+| schema_save_mode | Enum | 否 |
CREATE_SCHEMA_WHEN_NOT_EXIST | 在开启同步任务之前,对目标路径进行不同的处理
|
+| data_save_mode | Enum | 否 | APPEND_DATA
| 在开启同步任务之前,对目标路径中的数据文件进行不同的处理 |
### path [string]
@@ -288,6 +290,23 @@ Sink插件常用参数,请参考[Sink common Options](../Sink common Options
仅当file_format_type为json、text、csv、xml时使用。
要写入的文件的编码。此参数将由`Charset.forName(encoding)`解析。
+### schema_save_mode[Enum]
+
+在开启同步任务之前,对目标路径进行不同的处理。
+选项介绍:
+`RECREATE_SCHEMA` :当路径不存在时创建。如果路径已存在,则删除路径并重新创建。
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :当路径不存在时创建,路径存在时使用路径。
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :当路径不存在时报错
+`IGNORE` :忽略表的处理
+
+### data_save_mode[Enum]
+
+在开启同步任务之前,对目标路径中的数据文件进行不同的处理。
+选项介绍:
+`DROP_DATA`:使用路径但删除路径中的数据文件。
+`APPEND_DATA`:使用路径,并在路径中添加新文件以写入数据。
+`ERROR_WHEN_DATA_EXISTS`:当路径中存在数据文件时,将报错。
+
## 如何创建Oss数据同步作业
@@ -335,6 +354,8 @@ sink {
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -374,6 +395,8 @@ sink {
is_partition_field_write_in_file = true
file_format_type = "parquet"
sink_columns = ["name","age"]
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -408,6 +431,8 @@ sink {
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -528,6 +553,8 @@ sink {
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "lzo"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
}
}
```
@@ -538,4 +565,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 426ca3cd81..056c8767a9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -57,6 +57,8 @@ public class OssFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.required(OssFileSinkOptions.ACCESS_KEY)
.required(OssFileSinkOptions.ACCESS_SECRET)
.required(OssFileSinkOptions.ENDPOINT)
+ .optional(FileBaseSinkOptions.SCHEMA_SAVE_MODE)
+ .optional(FileBaseSinkOptions.DATA_SAVE_MODE)
.optional(FileBaseSinkOptions.FILE_FORMAT_TYPE)
.conditional(
FileBaseSinkOptions.FILE_FORMAT_TYPE,