This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 275db78918 [Feature][Connector-V2] Support create emtpy file when no
data (#8543)
275db78918 is described below
commit 275db789187b967824c765f72701eb3b2fc099ab
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 17 19:12:44 2025 +0800
[Feature][Connector-V2] Support create emtpy file when no data (#8543)
---
docs/en/connector-v2/sink/CosFile.md | 1 +
docs/en/connector-v2/sink/FtpFile.md | 1 +
docs/en/connector-v2/sink/HdfsFile.md | 1 +
docs/en/connector-v2/sink/LocalFile.md | 1 +
docs/en/connector-v2/sink/ObsFile.md | 1 +
docs/en/connector-v2/sink/OssFile.md | 1 +
docs/en/connector-v2/sink/OssJindoFile.md | 1 +
docs/en/connector-v2/sink/S3File.md | 1 +
docs/en/connector-v2/sink/SftpFile.md | 1 +
docs/zh/connector-v2/sink/HdfsFile.md | 3 +-
docs/zh/connector-v2/sink/LocalFile.md | 1 +
.../seatunnel/file/config/BaseFileSinkConfig.java | 7 ++
.../seatunnel/file/config/BaseSinkConfig.java | 7 ++
.../seatunnel/file/sink/BaseFileSink.java | 6 ++
.../file/sink/writer/AbstractWriteStrategy.java | 28 ++++++--
.../file/sink/writer/BinaryWriteStrategy.java | 10 ++-
.../file/sink/writer/ExcelWriteStrategy.java | 7 +-
.../file/sink/writer/JsonWriteStrategy.java | 5 +-
.../file/sink/writer/OrcWriteStrategy.java | 7 +-
.../file/sink/writer/ParquetWriteStrategy.java | 7 +-
.../file/sink/writer/TextWriteStrategy.java | 5 +-
.../seatunnel/file/sink/writer/WriteStrategy.java | 5 +-
.../file/sink/writer/XmlWriteStrategy.java | 7 +-
.../file/cos/sink/CosFileSinkFactory.java | 1 +
.../file/ftp/sink/FtpFileSinkFactory.java | 1 +
.../file/hdfs/sink/HdfsFileSinkFactory.java | 1 +
.../file/oss/jindo/sink/OssFileSinkFactory.java | 1 +
.../file/local/sink/LocalFileSinkFactory.java | 1 +
.../seatunnel/file/local/LocalFileTest.java | 77 ++++++++++++++++++++++
.../file/oss/sink/OssFileSinkFactory.java | 1 +
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 1 +
.../file/sftp/sink/SftpFileSinkFactory.java | 1 +
32 files changed, 175 insertions(+), 24 deletions(-)
diff --git a/docs/en/connector-v2/sink/CosFile.md
b/docs/en/connector-v2/sink/CosFile.md
index db11cfb9af..2441306566 100644
--- a/docs/en/connector-v2/sink/CosFile.md
+++ b/docs/en/connector-v2/sink/CosFile.md
@@ -63,6 +63,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| encoding | string | no | "UTF-8"
| Only used when file_format_type is
json,text,csv,xml.
|
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index 175d374d9a..47811bdd79 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -62,6 +62,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
|
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 095c32eabc..ae9479aa8f 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -69,6 +69,7 @@ Output data to hdfs file
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml, specifies the
tag name of the data rows within the XML file
[...]
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml, specifies
Whether to process data using the tag attribute format.
[...]
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix.
[...]
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
[...]
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
[...]
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
[...]
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
[...]
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index c48394f917..9c2141b61f 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -58,6 +58,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
|
diff --git a/docs/en/connector-v2/sink/ObsFile.md
b/docs/en/connector-v2/sink/ObsFile.md
index 560e7bfb35..aa852c9b70 100644
--- a/docs/en/connector-v2/sink/ObsFile.md
+++ b/docs/en/connector-v2/sink/ObsFile.md
@@ -71,6 +71,7 @@ It only supports hadoop version **2.9.X+**.
| is_enable_transaction | boolean | no | true
| [Tips](#is_enable_transaction)
|
| batch_size | int | no | 1000000
| [Tips](#batch_size)
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When this
parameter is turned on, batch_size will not take effect. The output file name
does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream, the
corresponding data files are still generated.
|
| compress_codec | string | no | none
| [Tips](#compress_codec)
|
| common-options | object | no | -
| [Tips](#common_options)
|
| max_rows_in_memory | int | no | -
| When File Format is Excel,The maximum number of data
items that can be cached in the memory.Only used when file_format is excel.
|
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
index 52da0e83f5..55ef4f0935 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -115,6 +115,7 @@ If write to `csv`, `text` file type, All column will be
string.
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
|
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md
b/docs/en/connector-v2/sink/OssJindoFile.md
index 1a95e81a44..21fe05359e 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -67,6 +67,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| encoding | string | no | "UTF-8"
| Only used when file_format_type is
json,text,csv,xml.
|
diff --git a/docs/en/connector-v2/sink/S3File.md
b/docs/en/connector-v2/sink/S3File.md
index b5fb34e031..b6fbc4ef4e 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -123,6 +123,7 @@ If write to `csv`, `text` file type, All column will be
string.
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml,
specifies the tag name of the data rows within the XML file
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml,
specifies Whether to process data using the tag attribute format.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one
file. When this parameter is turned on, batch_size will not take effect. The
output file name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization
upstream, the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| hadoop_s3_properties | map | no |
| If you need to add a other option, you
could add it here and refer to this
[link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html)
|
diff --git a/docs/en/connector-v2/sink/SftpFile.md
b/docs/en/connector-v2/sink/SftpFile.md
index dbc8438ae2..4cde1eb866 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -61,6 +61,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| xml_row_tag | string | no | RECORD
| Only used when file_format is xml.
|
| xml_use_attr_format | boolean | no | -
| Only used when file_format is xml.
|
| single_file_mode | boolean | no | false
| Each parallelism will only output one file. When
this parameter is turned on, batch_size will not take effect. The output file
name does not have a file block suffix. |
+| create_empty_file_when_no_data | boolean | no | false
| When there is no data synchronization upstream,
the corresponding data files are still generated.
|
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| enable_header_write | boolean | no | false
| Only used when file_format_type is text,csv.<br/>
false:don't write header,true:write header.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md
b/docs/zh/connector-v2/sink/HdfsFile.md
index c0212ae401..4561eb1572 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -56,6 +56,7 @@
| is_enable_transaction | boolean | 否 | true
| 如果 `is_enable_transaction` 为
true,则在将数据写入目标目录时,我们将确保数据不会丢失或重复。请注意,如果 `is_enable_transaction` 为
`true`,我们将在文件头部自动添加 `${transactionId}_`。目前仅支持 `true`。
|
| batch_size | int | 否 | 1000000
| 文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 `batch_size` 和
`checkpoint.interval` 共同决定。如果 `checkpoint.interval`
的值足够大,则接收器写入器将在文件中写入行,直到文件中的行大于 `batch_size`。如果 `checkpoint.interval`
很小,则接收器写入器将在新检查点触发时创建一个新文件。
|
| single_file_mode | boolean | 否 | false
| 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。
|
+| create_empty_file_when_no_data | boolean | 否 | false
| 当上游没有数据同步时,依然生成对应的数据文件。
|
| compress_codec | string | 否 | none
| 文件的压缩编解码器及其支持的细节如下所示:[txt: `lzo` `none`,json: `lzo`
`none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo`
`snappy` `lz4` `gzip` `brotli` `zstd` `none`]。提示:excel类型不支持任何压缩格式。
|
| krb5_path | string | 否 | /etc/krb5.conf
| kerberos 的 krb5 路径
|
| kerberos_principal | string | 否 | -
| kerberos 的主体
|
@@ -65,7 +66,7 @@
| enable_header_write | boolean | 否 | false
| 仅在 file_format_type 为 text,csv 时使用。<br/>
false:不写入表头,true:写入表头。
|
| max_rows_in_memory | int | 否 | -
| 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。
|
| sheet_name | string | 否 | Sheet${Random number}
| 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名
|
-| remote_user | string | 否 | -
| Hdfs的远端用户名。
|
+| remote_user | string | 否 | -
| Hdfs的远端用户名。
|
### 提示
diff --git a/docs/zh/connector-v2/sink/LocalFile.md
b/docs/zh/connector-v2/sink/LocalFile.md
index dbd7738488..13cfd3cfbf 100644
--- a/docs/zh/connector-v2/sink/LocalFile.md
+++ b/docs/zh/connector-v2/sink/LocalFile.md
@@ -50,6 +50,7 @@
| is_enable_transaction | boolean | 否 | true
| 是否启用事务
|
| batch_size | int | 否 | 1000000
| 批量大小
|
| single_file_mode | boolean | 否 | false
| 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。
|
+| create_empty_file_when_no_data | boolean | 否 | false
| 当上游没有数据同步时,依然生成对应的数据文件。
|
| compress_codec | string | 否 | none
| 压缩编码
|
| common-options | object | 否 | -
| 常见选项
|
| max_rows_in_memory | int | 否 | -
| 仅在 file_format_type 为 excel 时使用
|
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 2957f451b4..bebf4fdbae 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -44,6 +44,8 @@ public class BaseFileSinkConfig implements DelimiterConfig,
Serializable {
protected String path;
protected String fileNameExpression =
BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
protected boolean singleFileMode =
BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
+ protected boolean createEmptyFileWhenNoData =
+ BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
protected FileFormat fileFormat = FileFormat.TEXT;
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
@@ -87,6 +89,11 @@ public class BaseFileSinkConfig implements DelimiterConfig,
Serializable {
this.singleFileMode =
config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
}
+ if
(config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
+ this.createEmptyFileWhenNoData =
+
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
+ }
+
if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
&&
!StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
this.fileFormat =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 88be37bc10..d2d3c4d0cd 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -194,6 +194,13 @@ public class BaseSinkConfig extends KerberosConfig {
.withDescription(
"Whether to write all data to a single file in
each parallelism task");
+ public static final Option<Boolean> CREATE_EMPTY_FILE_WHEN_NO_DATA =
+ Options.key("create_empty_file_when_no_data")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to generate an empty file when there is
no data to write");
+
public static final Option<String> FILENAME_TIME_FORMAT =
Options.key("filename_time_format")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index a02e41c87b..09bc3ea285 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -60,6 +60,12 @@ public abstract class BaseFileSink
throw new IllegalArgumentException(
"Single file mode is not supported when checkpoint is
enabled or in streaming mode.");
}
+ if
(pluginConfig.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+ &&
pluginConfig.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+ && !fileSinkConfig.getPartitionFieldList().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Generate empty file when no data is not supported when
partition is enabled.");
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index e48f8d3729..3ae380d307 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
+import lombok.SneakyThrows;
import java.io.File;
import java.io.IOException;
@@ -59,7 +60,7 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
-public abstract class AbstractWriteStrategy implements WriteStrategy {
+public abstract class AbstractWriteStrategy<T> implements WriteStrategy<T> {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final FileSinkConfig fileSinkConfig;
protected final CompressFormat compressFormat;
@@ -248,8 +249,13 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*
* @return the file commit information
*/
+ @SneakyThrows
@Override
public Optional<FileCommitInfo> prepareCommit() {
+ if (this.needMoveFiles.isEmpty() &&
fileSinkConfig.isCreateEmptyFileWhenNoData()) {
+ String filePath = createFilePathWithoutPartition();
+ this.getOrCreateOutputStream(filePath);
+ }
this.finishAndCloseFile();
LinkedHashMap<String, String> commitMap = new
LinkedHashMap<>(this.needMoveFiles);
LinkedHashMap<String, List<String>> copyMap =
@@ -361,10 +367,25 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
return String.join(File.separator, strings);
}
+ public String createFilePathWithoutPartition() {
+ return getPathWithPartitionInfo(null, true);
+ }
+
public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
generatorPartitionDir(seaTunnelRow);
- String beingWrittenFileKey =
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
+ boolean noPartition =
+ BaseSinkConfig.NON_PARTITION.equals(
+
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString());
+ return getPathWithPartitionInfo(dataPartitionDirAndValuesMap,
noPartition);
+ }
+
+ private String getPathWithPartitionInfo(
+ LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap,
boolean noPartition) {
+ String beingWrittenFileKey =
+ noPartition
+ ? BaseSinkConfig.NON_PARTITION
+ :
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
// get filePath from beingWrittenFile
String beingWrittenFilePath =
beingWrittenFile.get(beingWrittenFileKey);
if (beingWrittenFilePath != null) {
@@ -376,8 +397,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
};
String newBeingWrittenFilePath = String.join(File.separator,
pathSegments);
beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
- if (!BaseSinkConfig.NON_PARTITION.equals(
-
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())) {
+ if (!noPartition) {
partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
}
return newBeingWrittenFilePath;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
index 06d05d6250..db3f0c1fc2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
@@ -34,7 +34,7 @@ import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
-public class BinaryWriteStrategy extends AbstractWriteStrategy {
+public class BinaryWriteStrategy extends
AbstractWriteStrategy<FSDataOutputStream> {
private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
private final LinkedHashMap<String, Long> partIndexMap;
@@ -43,6 +43,11 @@ public class BinaryWriteStrategy extends
AbstractWriteStrategy {
super(fileSinkConfig);
this.beingWrittenOutputStream = new LinkedHashMap<>();
this.partIndexMap = new LinkedHashMap<>();
+ if (fileSinkConfig.isCreateEmptyFileWhenNoData()) {
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
+ "BinaryWriteStrategy does not support generating empty
files when no data is written.");
+ }
}
@Override
@@ -88,7 +93,8 @@ public class BinaryWriteStrategy extends
AbstractWriteStrategy {
}
}
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ @Override
+ public FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
index 1e4f90aadd..0fa1d260b4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
@@ -29,7 +29,7 @@ import lombok.NonNull;
import java.io.IOException;
import java.util.LinkedHashMap;
-public class ExcelWriteStrategy extends AbstractWriteStrategy {
+public class ExcelWriteStrategy extends AbstractWriteStrategy<ExcelGenerator> {
private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;
public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -41,7 +41,7 @@ public class ExcelWriteStrategy extends AbstractWriteStrategy
{
public void write(SeaTunnelRow seaTunnelRow) {
super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- ExcelGenerator excelGenerator = getOrCreateExcelGenerator(filePath);
+ ExcelGenerator excelGenerator = getOrCreateOutputStream(filePath);
excelGenerator.writeData(seaTunnelRow);
}
@@ -63,7 +63,8 @@ public class ExcelWriteStrategy extends AbstractWriteStrategy
{
beingWrittenWriter.clear();
}
- private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath)
{
+ @Override
+ public ExcelGenerator getOrCreateOutputStream(@NonNull String filePath) {
ExcelGenerator excelGenerator = this.beingWrittenWriter.get(filePath);
if (excelGenerator == null) {
excelGenerator =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 23fb7893a8..bc4a08b2d8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -39,7 +39,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-public class JsonWriteStrategy extends AbstractWriteStrategy {
+public class JsonWriteStrategy extends
AbstractWriteStrategy<FSDataOutputStream> {
private final byte[] rowDelimiter;
private SerializationSchema serializationSchema;
private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
@@ -111,7 +111,8 @@ public class JsonWriteStrategy extends
AbstractWriteStrategy {
isFirstWrite.clear();
}
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ @Override
+ public FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index f6b47ce4d2..366c9bb82a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -59,7 +59,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-public class OrcWriteStrategy extends AbstractWriteStrategy {
+public class OrcWriteStrategy extends AbstractWriteStrategy<Writer> {
private final LinkedHashMap<String, Writer> beingWrittenWriter;
public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -71,7 +71,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- Writer writer = getOrCreateWriter(filePath);
+ Writer writer = getOrCreateOutputStream(filePath);
TypeDescription schema = buildSchemaWithRowType();
VectorizedRowBatch rowBatch = schema.createRowBatch();
int i = 0;
@@ -109,7 +109,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
this.beingWrittenWriter.clear();
}
- private Writer getOrCreateWriter(@NonNull String filePath) {
+ @Override
+ public Writer getOrCreateOutputStream(@NonNull String filePath) {
Writer writer = this.beingWrittenWriter.get(filePath);
if (writer == null) {
TypeDescription schema = buildSchemaWithRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index b0f873296c..41155fd829 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -74,7 +74,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class ParquetWriteStrategy extends AbstractWriteStrategy {
+public class ParquetWriteStrategy extends
AbstractWriteStrategy<ParquetWriter<GenericRecord>> {
private final LinkedHashMap<String, ParquetWriter<GenericRecord>>
beingWrittenWriter;
private AvroSchemaConverter schemaConverter;
private Schema schema;
@@ -119,7 +119,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+ ParquetWriter<GenericRecord> writer =
getOrCreateOutputStream(filePath);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
for (Integer integer : sinkColumnsIndexInRow) {
String fieldName = seaTunnelRowType.getFieldName(integer);
@@ -155,7 +155,8 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
this.beingWrittenWriter.clear();
}
- private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String
filePath) {
+ @Override
+ public ParquetWriter<GenericRecord> getOrCreateOutputStream(@NonNull
String filePath) {
if (schema == null) {
schema = buildAvroSchemaWithRowType(seaTunnelRowType,
sinkColumnsIndexInRow);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 77e2eb5c5b..262448d195 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -43,7 +43,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-public class TextWriteStrategy extends AbstractWriteStrategy {
+public class TextWriteStrategy extends
AbstractWriteStrategy<FSDataOutputStream> {
private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;
private final String fieldDelimiter;
@@ -132,7 +132,8 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
isFirstWrite.clear();
}
- private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ @Override
+ public FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 24b23c9bfc..25b84714d4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -27,11 +27,12 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
import org.apache.hadoop.conf.Configuration;
import java.io.Closeable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
-public interface WriteStrategy extends Transaction, Serializable, Closeable {
+public interface WriteStrategy<T> extends Transaction, Serializable, Closeable
{
/**
* init hadoop conf
*
@@ -70,6 +71,8 @@ public interface WriteStrategy extends Transaction,
Serializable, Closeable {
*/
LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow);
+ T getOrCreateOutputStream(String path) throws IOException;
+
/**
* use transaction id generate file name
*
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
index 74fa220031..adcc57b3dc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
* ensures that each file is written to only once. It writes the data by
passing the data row to the
* corresponding XmlWriter instance.
*/
-public class XmlWriteStrategy extends AbstractWriteStrategy {
+public class XmlWriteStrategy extends AbstractWriteStrategy<XmlWriter> {
private final LinkedHashMap<String, XmlWriter> beingWrittenWriter;
@@ -48,7 +48,7 @@ public class XmlWriteStrategy extends AbstractWriteStrategy {
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException
{
super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
- XmlWriter xmlDocWriter = getOrCreateXmlWriter(filePath);
+ XmlWriter xmlDocWriter = getOrCreateOutputStream(filePath);
xmlDocWriter.writeData(seaTunnelRow);
}
@@ -70,7 +70,8 @@ public class XmlWriteStrategy extends AbstractWriteStrategy {
this.beingWrittenWriter.clear();
}
- private XmlWriter getOrCreateXmlWriter(String filePath) {
+ @Override
+ public XmlWriter getOrCreateOutputStream(String filePath) {
return beingWrittenWriter.computeIfAbsent(
filePath,
k -> new XmlWriter(fileSinkConfig, sinkColumnsIndexInRow,
seaTunnelRowType));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
index b728af4951..b7b6185498 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -93,6 +93,7 @@ public class CosFileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 3dc48bd3bb..0a9dfcb3d7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -107,6 +107,7 @@ public class FtpFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index b937a8bed0..1f5d98b0be 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -95,6 +95,7 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH)
.optional(BaseSinkConfig.KRB5_PATH)
.optional(BaseSinkConfig.REMOTE_USER)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
index 7ecbb6c3f1..2064c2937e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
@@ -93,6 +93,7 @@ public class OssFileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 8450f13999..c6e75615be 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -100,6 +100,7 @@ public class LocalFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
index 8280ec5e2a..f1fffa4ece 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.FileUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.local.sink.LocalFileSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
@@ -33,7 +34,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -153,4 +156,78 @@ public class LocalFileTest {
FileUtils.getFileLineNumber(
"/tmp/seatunnel/LocalFileTest/only_one_file_1.txt"));
}
+
+ @Test
+ void testCreateEmptyFileWhenNoData() throws IOException {
+ Map<String, Object> options =
+ new HashMap<String, Object>() {
+ {
+ put("path", "/tmp/seatunnel/LocalFileTest");
+ put("row_delimiter", "\n");
+ put("file_name_expression", "empty_file");
+ put("is_enable_transaction", false);
+ put("batch_size", 1);
+ put("create_empty_file_when_no_data", true);
+ }
+ };
+ options.put("file_format_type", "text");
+ FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable,
+ ReadonlyConfig.fromMap(options),
+ new LocalFileSinkFactory(),
+ Collections.emptyList());
+ Assertions.assertEquals(
+ 0,
+ (long)
+ FileUtils.getFileLineNumber(
+
"/tmp/seatunnel/LocalFileTest/empty_file_0.txt"));
+
+ options.put("file_format_type", "csv");
+ FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable,
+ ReadonlyConfig.fromMap(options),
+ new LocalFileSinkFactory(),
+ Collections.emptyList());
+ Assertions.assertEquals(
+ 0,
+ (long)
+ FileUtils.getFileLineNumber(
+
"/tmp/seatunnel/LocalFileTest/empty_file_0.csv"));
+
+ options.put("enable_header_write", true);
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable,
+ ReadonlyConfig.fromMap(options),
+ new LocalFileSinkFactory(),
+ Collections.emptyList());
+ Assertions.assertEquals(
+ "test\n",
+ FileUtils.readFileToStr(
+
Paths.get("/tmp/seatunnel/LocalFileTest/empty_file_0.csv")));
+
+ options.put("file_format_type", "parquet");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable,
+ ReadonlyConfig.fromMap(options),
+ new LocalFileSinkFactory(),
+ Collections.emptyList());
+ Assertions.assertEquals(
+ 300, new
File("/tmp/seatunnel/LocalFileTest/empty_file_0.parquet").length());
+
+ options.put("file_format_type", "binary");
+ FileConnectorException exception =
+ Assertions.assertThrows(
+ FileConnectorException.class,
+ () ->
+
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable,
+ ReadonlyConfig.fromMap(options),
+ new LocalFileSinkFactory(),
+ Collections.emptyList()));
+ Assertions.assertEquals(
+ "ErrorCode:[FILE-07], ErrorDescription:[Format not support] -
BinaryWriteStrategy does not support generating empty files when no data is
written.",
+ exception.getMessage());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 246c769b76..82a062faf1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -105,6 +105,7 @@ public class OssFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 950582a860..492605b874 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -105,6 +105,7 @@ public class S3FileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.optional(BaseSinkConfig.TMP_PATH)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index 4ff9c6928f..4ab3d18651 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -105,6 +105,7 @@ public class SftpFileSinkFactory extends
BaseMultipleTableFileSinkFactory {
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.SINGLE_FILE_MODE)
.optional(BaseSinkConfig.BATCH_SIZE)
+ .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
.build();
}