This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 275db78918 [Feature][Connector-V2] Support create emtpy file when no 
data (#8543)
275db78918 is described below

commit 275db789187b967824c765f72701eb3b2fc099ab
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 17 19:12:44 2025 +0800

    [Feature][Connector-V2] Support create emtpy file when no data (#8543)
---
 docs/en/connector-v2/sink/CosFile.md               |  1 +
 docs/en/connector-v2/sink/FtpFile.md               |  1 +
 docs/en/connector-v2/sink/HdfsFile.md              |  1 +
 docs/en/connector-v2/sink/LocalFile.md             |  1 +
 docs/en/connector-v2/sink/ObsFile.md               |  1 +
 docs/en/connector-v2/sink/OssFile.md               |  1 +
 docs/en/connector-v2/sink/OssJindoFile.md          |  1 +
 docs/en/connector-v2/sink/S3File.md                |  1 +
 docs/en/connector-v2/sink/SftpFile.md              |  1 +
 docs/zh/connector-v2/sink/HdfsFile.md              |  3 +-
 docs/zh/connector-v2/sink/LocalFile.md             |  1 +
 .../seatunnel/file/config/BaseFileSinkConfig.java  |  7 ++
 .../seatunnel/file/config/BaseSinkConfig.java      |  7 ++
 .../seatunnel/file/sink/BaseFileSink.java          |  6 ++
 .../file/sink/writer/AbstractWriteStrategy.java    | 28 ++++++--
 .../file/sink/writer/BinaryWriteStrategy.java      | 10 ++-
 .../file/sink/writer/ExcelWriteStrategy.java       |  7 +-
 .../file/sink/writer/JsonWriteStrategy.java        |  5 +-
 .../file/sink/writer/OrcWriteStrategy.java         |  7 +-
 .../file/sink/writer/ParquetWriteStrategy.java     |  7 +-
 .../file/sink/writer/TextWriteStrategy.java        |  5 +-
 .../seatunnel/file/sink/writer/WriteStrategy.java  |  5 +-
 .../file/sink/writer/XmlWriteStrategy.java         |  7 +-
 .../file/cos/sink/CosFileSinkFactory.java          |  1 +
 .../file/ftp/sink/FtpFileSinkFactory.java          |  1 +
 .../file/hdfs/sink/HdfsFileSinkFactory.java        |  1 +
 .../file/oss/jindo/sink/OssFileSinkFactory.java    |  1 +
 .../file/local/sink/LocalFileSinkFactory.java      |  1 +
 .../seatunnel/file/local/LocalFileTest.java        | 77 ++++++++++++++++++++++
 .../file/oss/sink/OssFileSinkFactory.java          |  1 +
 .../seatunnel/file/s3/sink/S3FileSinkFactory.java  |  1 +
 .../file/sftp/sink/SftpFileSinkFactory.java        |  1 +
 32 files changed, 175 insertions(+), 24 deletions(-)

diff --git a/docs/en/connector-v2/sink/CosFile.md 
b/docs/en/connector-v2/sink/CosFile.md
index db11cfb9af..2441306566 100644
--- a/docs/en/connector-v2/sink/CosFile.md
+++ b/docs/en/connector-v2/sink/CosFile.md
@@ -63,6 +63,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | encoding                              | string  | no       | "UTF-8"         
                           | Only used when file_format_type is 
json,text,csv,xml.                                                              
                                                    |
diff --git a/docs/en/connector-v2/sink/FtpFile.md 
b/docs/en/connector-v2/sink/FtpFile.md
index 175d374d9a..47811bdd79 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -62,6 +62,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.                                     
                                     |
diff --git a/docs/en/connector-v2/sink/HdfsFile.md 
b/docs/en/connector-v2/sink/HdfsFile.md
index 095c32eabc..ae9479aa8f 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -69,6 +69,7 @@ Output data to hdfs file
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml, specifies the 
tag name of the data rows within the XML file                                   
                                                                                
                                                                                
                                                                                
                [...]
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml, specifies 
Whether to process data using the tag attribute format.                         
                                                                                
                                                                                
                                                                                
                    [...]
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix.                                         
                                                                                
                                                                                
                  [...]
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                                                                
                                                                                
                                                                                
                 [...]
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.                                     
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index c48394f917..9c2141b61f 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -58,6 +58,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.                                     
                                     |
diff --git a/docs/en/connector-v2/sink/ObsFile.md 
b/docs/en/connector-v2/sink/ObsFile.md
index 560e7bfb35..aa852c9b70 100644
--- a/docs/en/connector-v2/sink/ObsFile.md
+++ b/docs/en/connector-v2/sink/ObsFile.md
@@ -71,6 +71,7 @@ It only supports hadoop version **2.9.X+**.
 | is_enable_transaction            | boolean | no       | true                 
                      | [Tips](#is_enable_transaction)                          
                                                                                
                               |
 | batch_size                       | int     | no       | 1000000              
                      | [Tips](#batch_size)                                     
                                                                                
                               |
 | single_file_mode                 | boolean | no       | false                
                      | Each parallelism will only output one file. When this 
parameter is turned on, batch_size will not take effect. The output file name 
does not have a file block suffix. |
+| create_empty_file_when_no_data   | boolean | no       | false                
                      | When there is no data synchronization upstream, the 
corresponding data files are still generated.                                   
                                   |
 | compress_codec                   | string  | no       | none                 
                      | [Tips](#compress_codec)                                 
                                                                                
                               |
 | common-options                   | object  | no       | -                    
                      | [Tips](#common_options)                                 
                                                                                
                               |
 | max_rows_in_memory               | int     | no       | -                    
                      | When File Format is Excel,The maximum number of data 
items that can be cached in the memory.Only used when file_format is excel.     
                                  |
diff --git a/docs/en/connector-v2/sink/OssFile.md 
b/docs/en/connector-v2/sink/OssFile.md
index 52da0e83f5..55ef4f0935 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -115,6 +115,7 @@ If write to `csv`, `text` file type, All column will be 
string.
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.                                     
                                     |
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md 
b/docs/en/connector-v2/sink/OssJindoFile.md
index 1a95e81a44..21fe05359e 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -67,6 +67,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | encoding                              | string  | no       | "UTF-8"         
                           | Only used when file_format_type is 
json,text,csv,xml.                                                              
                                                    |
diff --git a/docs/en/connector-v2/sink/S3File.md 
b/docs/en/connector-v2/sink/S3File.md
index b5fb34e031..b6fbc4ef4e 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -123,6 +123,7 @@ If write to `csv`, `text` file type, All column will be 
string.
 | xml_row_tag                           | string  | no       | RECORD          
                                      | Only used when file_format is xml, 
specifies the tag name of the data rows within the XML file                     
                                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                                      | Only used when file_format is xml, 
specifies Whether to process data using the tag attribute format.               
                                                    |
 | single_file_mode                      | boolean | no       | false           
                                      | Each parallelism will only output one 
file. When this parameter is turned on, batch_size will not take effect. The 
output file name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                                      | When there is no data synchronization 
upstream, the corresponding data files are still generated.                     
                                                 |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                                      | Only used when file_format is parquet.  
                                                                                
                                               |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                                      | Only used when file_format is parquet.  
                                                                                
                                               |
 | hadoop_s3_properties                  | map     | no       |                 
                                      | If you need to add a other option, you 
could add it here and refer to this 
[link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html)
        |
diff --git a/docs/en/connector-v2/sink/SftpFile.md 
b/docs/en/connector-v2/sink/SftpFile.md
index dbc8438ae2..4cde1eb866 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -61,6 +61,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | xml_row_tag                           | string  | no       | RECORD          
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | xml_use_attr_format                   | boolean | no       | -               
                           | Only used when file_format is xml.                 
                                                                                
                                    |
 | single_file_mode                      | boolean | no       | false           
                           | Each parallelism will only output one file. When 
this parameter is turned on, batch_size will not take effect. The output file 
name does not have a file block suffix. |
+| create_empty_file_when_no_data        | boolean | no       | false           
                           | When there is no data synchronization upstream, 
the corresponding data files are still generated.                               
                                       |
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                                                
                                    |
 | enable_header_write                   | boolean | no       | false           
                           | Only used when file_format_type is text,csv.<br/> 
false:don't write header,true:write header.                                     
                                     |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                                                
                                    |
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md 
b/docs/zh/connector-v2/sink/HdfsFile.md
index c0212ae401..4561eb1572 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -56,6 +56,7 @@
 | is_enable_transaction            | boolean | 否    | true                     
                  | 如果 `is_enable_transaction` 为 
true,则在将数据写入目标目录时,我们将确保数据不会丢失或重复。请注意,如果 `is_enable_transaction` 为 
`true`,我们将在文件头部自动添加 `${transactionId}_`。目前仅支持 `true`。                           
                                                                                
                                  |
 | batch_size                       | int     | 否    | 1000000                  
                  | 文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 `batch_size` 和 
`checkpoint.interval` 共同决定。如果 `checkpoint.interval` 
的值足够大,则接收器写入器将在文件中写入行,直到文件中的行大于 `batch_size`。如果 `checkpoint.interval` 
很小,则接收器写入器将在新检查点触发时创建一个新文件。                                                     
                                   |
 | single_file_mode                 | boolean | 否    | false                    
                  | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。        
                                                                                
                                                                                
                                                                     |
+| create_empty_file_when_no_data   | boolean | 否    | false                    
                  | 当上游没有数据同步时,依然生成对应的数据文件。                                     
                                                                                
                                                                                
                                                                     |
 | compress_codec                   | string  | 否    | none                     
                  | 文件的压缩编解码器及其支持的细节如下所示:[txt: `lzo` `none`,json: `lzo` 
`none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo` 
`snappy` `lz4` `gzip` `brotli` `zstd` `none`]。提示:excel类型不支持任何压缩格式。              
                                                                             |
 | krb5_path                        | string  | 否    | /etc/krb5.conf           
                  | kerberos 的 krb5 路径                                          
                                                                                
                                                                                
                                                                     |
 | kerberos_principal               | string  | 否    | -                        
                  | kerberos 的主体                                                
                                                                                
                                                                                
                                                                     |
@@ -65,7 +66,7 @@
 | enable_header_write              | boolean | 否    | false                    
                  | 仅在 file_format_type 为 text,csv 时使用。<br/> 
false:不写入表头,true:写入表头。                                                          
                                                                                
                                                                                
        |
 | max_rows_in_memory               | int     | 否    | -                        
                  | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。  
                                                                                
                                                                                
                                                                     |
 | sheet_name                       | string  | 否    | Sheet${Random number}    
                  | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名                    
                                                                                
                                                                                
                                                                     |
-| remote_user                           | string  | 否    | -                   
                       | Hdfs的远端用户名。                                            
                                                                                
                                                                                
                                                                          |
+| remote_user                      | string  | 否    | -                        
                  | Hdfs的远端用户名。                                                 
                                                                                
                                                                                
                                                                     |
 
 ### 提示
 
diff --git a/docs/zh/connector-v2/sink/LocalFile.md 
b/docs/zh/connector-v2/sink/LocalFile.md
index dbd7738488..13cfd3cfbf 100644
--- a/docs/zh/connector-v2/sink/LocalFile.md
+++ b/docs/zh/connector-v2/sink/LocalFile.md
@@ -50,6 +50,7 @@
 | is_enable_transaction                 | boolean | 否    | true                
                       | 是否启用事务                                                 
         |
 | batch_size                            | int     | 否    | 1000000             
                       | 批量大小                                                   
         |
 | single_file_mode                      | boolean | 否    | false               
                       | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。   
         |
+| create_empty_file_when_no_data        | boolean | 否    | false               
                       | 当上游没有数据同步时,依然生成对应的数据文件。                                
         |
 | compress_codec                        | string  | 否    | none                
                       | 压缩编码                                                   
         |
 | common-options                        | object  | 否    | -                   
                       | 常见选项                                                   
         |
 | max_rows_in_memory                    | int     | 否    | -                   
                       | 仅在 file_format_type 为 excel 时使用                        
         |
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 2957f451b4..bebf4fdbae 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -44,6 +44,8 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
Serializable {
     protected String path;
     protected String fileNameExpression = 
BaseSinkConfig.FILE_NAME_EXPRESSION.defaultValue();
     protected boolean singleFileMode = 
BaseSinkConfig.SINGLE_FILE_MODE.defaultValue();
+    protected boolean createEmptyFileWhenNoData =
+            BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
     protected FileFormat fileFormat = FileFormat.TEXT;
     protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
     protected DateTimeUtils.Formatter datetimeFormat = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
@@ -87,6 +89,11 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
Serializable {
             this.singleFileMode = 
config.getBoolean(BaseSinkConfig.SINGLE_FILE_MODE.key());
         }
 
+        if 
(config.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
+            this.createEmptyFileWhenNoData =
+                    
config.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
+        }
+
         if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
                 && 
!StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
             this.fileFormat =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 88be37bc10..d2d3c4d0cd 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -194,6 +194,13 @@ public class BaseSinkConfig extends KerberosConfig {
                     .withDescription(
                             "Whether to write all data to a single file in 
each parallelism task");
 
+    public static final Option<Boolean> CREATE_EMPTY_FILE_WHEN_NO_DATA =
+            Options.key("create_empty_file_when_no_data")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to generate an empty file when there is 
no data to write");
+
     public static final Option<String> FILENAME_TIME_FORMAT =
             Options.key("filename_time_format")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index a02e41c87b..09bc3ea285 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -60,6 +60,12 @@ public abstract class BaseFileSink
             throw new IllegalArgumentException(
                     "Single file mode is not supported when checkpoint is 
enabled or in streaming mode.");
         }
+        if 
(pluginConfig.hasPath(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+                && 
pluginConfig.getBoolean(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+                && !fileSinkConfig.getPartitionFieldList().isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Generate empty file when no data is not supported when 
partition is enabled.");
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index e48f8d3729..3ae380d307 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import lombok.NonNull;
+import lombok.SneakyThrows;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,7 +60,7 @@ import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
-public abstract class AbstractWriteStrategy implements WriteStrategy {
+public abstract class AbstractWriteStrategy<T> implements WriteStrategy<T> {
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
     protected final FileSinkConfig fileSinkConfig;
     protected final CompressFormat compressFormat;
@@ -248,8 +249,13 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      *
      * @return the file commit information
      */
+    @SneakyThrows
     @Override
     public Optional<FileCommitInfo> prepareCommit() {
+        if (this.needMoveFiles.isEmpty() && 
fileSinkConfig.isCreateEmptyFileWhenNoData()) {
+            String filePath = createFilePathWithoutPartition();
+            this.getOrCreateOutputStream(filePath);
+        }
         this.finishAndCloseFile();
         LinkedHashMap<String, String> commitMap = new 
LinkedHashMap<>(this.needMoveFiles);
         LinkedHashMap<String, List<String>> copyMap =
@@ -361,10 +367,25 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         return String.join(File.separator, strings);
     }
 
+    public String createFilePathWithoutPartition() {
+        return getPathWithPartitionInfo(null, true);
+    }
+
     public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
         LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
                 generatorPartitionDir(seaTunnelRow);
-        String beingWrittenFileKey = 
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
+        boolean noPartition =
+                BaseSinkConfig.NON_PARTITION.equals(
+                        
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString());
+        return getPathWithPartitionInfo(dataPartitionDirAndValuesMap, 
noPartition);
+    }
+
+    private String getPathWithPartitionInfo(
+            LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap, 
boolean noPartition) {
+        String beingWrittenFileKey =
+                noPartition
+                        ? BaseSinkConfig.NON_PARTITION
+                        : 
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
         // get filePath from beingWrittenFile
         String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
         if (beingWrittenFilePath != null) {
@@ -376,8 +397,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
                     };
             String newBeingWrittenFilePath = String.join(File.separator, 
pathSegments);
             beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
-            if (!BaseSinkConfig.NON_PARTITION.equals(
-                    
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())) {
+            if (!noPartition) {
                 partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
             }
             return newBeingWrittenFilePath;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
index 06d05d6250..db3f0c1fc2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
@@ -34,7 +34,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 
-public class BinaryWriteStrategy extends AbstractWriteStrategy {
+public class BinaryWriteStrategy extends 
AbstractWriteStrategy<FSDataOutputStream> {
 
     private final LinkedHashMap<String, FSDataOutputStream> 
beingWrittenOutputStream;
     private final LinkedHashMap<String, Long> partIndexMap;
@@ -43,6 +43,11 @@ public class BinaryWriteStrategy extends 
AbstractWriteStrategy {
         super(fileSinkConfig);
         this.beingWrittenOutputStream = new LinkedHashMap<>();
         this.partIndexMap = new LinkedHashMap<>();
+        if (fileSinkConfig.isCreateEmptyFileWhenNoData()) {
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
+                    "BinaryWriteStrategy does not support generating empty 
files when no data is written.");
+        }
     }
 
     @Override
@@ -88,7 +93,8 @@ public class BinaryWriteStrategy extends 
AbstractWriteStrategy {
         }
     }
 
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+    @Override
+    public FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
index 1e4f90aadd..0fa1d260b4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
@@ -29,7 +29,7 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 
-public class ExcelWriteStrategy extends AbstractWriteStrategy {
+public class ExcelWriteStrategy extends AbstractWriteStrategy<ExcelGenerator> {
     private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;
 
     public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -41,7 +41,7 @@ public class ExcelWriteStrategy extends AbstractWriteStrategy 
{
     public void write(SeaTunnelRow seaTunnelRow) {
         super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        ExcelGenerator excelGenerator = getOrCreateExcelGenerator(filePath);
+        ExcelGenerator excelGenerator = getOrCreateOutputStream(filePath);
         excelGenerator.writeData(seaTunnelRow);
     }
 
@@ -63,7 +63,8 @@ public class ExcelWriteStrategy extends AbstractWriteStrategy 
{
         beingWrittenWriter.clear();
     }
 
-    private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath) 
{
+    @Override
+    public ExcelGenerator getOrCreateOutputStream(@NonNull String filePath) {
         ExcelGenerator excelGenerator = this.beingWrittenWriter.get(filePath);
         if (excelGenerator == null) {
             excelGenerator =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 23fb7893a8..bc4a08b2d8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -39,7 +39,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-public class JsonWriteStrategy extends AbstractWriteStrategy {
+public class JsonWriteStrategy extends 
AbstractWriteStrategy<FSDataOutputStream> {
     private final byte[] rowDelimiter;
     private SerializationSchema serializationSchema;
     private final LinkedHashMap<String, FSDataOutputStream> 
beingWrittenOutputStream;
@@ -111,7 +111,8 @@ public class JsonWriteStrategy extends 
AbstractWriteStrategy {
         isFirstWrite.clear();
     }
 
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+    @Override
+    public FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index f6b47ce4d2..366c9bb82a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -59,7 +59,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-public class OrcWriteStrategy extends AbstractWriteStrategy {
+public class OrcWriteStrategy extends AbstractWriteStrategy<Writer> {
     private final LinkedHashMap<String, Writer> beingWrittenWriter;
 
     public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
@@ -71,7 +71,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
         super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        Writer writer = getOrCreateWriter(filePath);
+        Writer writer = getOrCreateOutputStream(filePath);
         TypeDescription schema = buildSchemaWithRowType();
         VectorizedRowBatch rowBatch = schema.createRowBatch();
         int i = 0;
@@ -109,7 +109,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         this.beingWrittenWriter.clear();
     }
 
-    private Writer getOrCreateWriter(@NonNull String filePath) {
+    @Override
+    public Writer getOrCreateOutputStream(@NonNull String filePath) {
         Writer writer = this.beingWrittenWriter.get(filePath);
         if (writer == null) {
             TypeDescription schema = buildSchemaWithRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index b0f873296c..41155fd829 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -74,7 +74,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class ParquetWriteStrategy extends AbstractWriteStrategy {
+public class ParquetWriteStrategy extends 
AbstractWriteStrategy<ParquetWriter<GenericRecord>> {
     private final LinkedHashMap<String, ParquetWriter<GenericRecord>> 
beingWrittenWriter;
     private AvroSchemaConverter schemaConverter;
     private Schema schema;
@@ -119,7 +119,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
         super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+        ParquetWriter<GenericRecord> writer = 
getOrCreateOutputStream(filePath);
         GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
         for (Integer integer : sinkColumnsIndexInRow) {
             String fieldName = seaTunnelRowType.getFieldName(integer);
@@ -155,7 +155,8 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
         this.beingWrittenWriter.clear();
     }
 
-    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String 
filePath) {
+    @Override
+    public ParquetWriter<GenericRecord> getOrCreateOutputStream(@NonNull 
String filePath) {
         if (schema == null) {
             schema = buildAvroSchemaWithRowType(seaTunnelRowType, 
sinkColumnsIndexInRow);
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 77e2eb5c5b..262448d195 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -43,7 +43,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-public class TextWriteStrategy extends AbstractWriteStrategy {
+public class TextWriteStrategy extends 
AbstractWriteStrategy<FSDataOutputStream> {
     private final LinkedHashMap<String, FSDataOutputStream> 
beingWrittenOutputStream;
     private final Map<String, Boolean> isFirstWrite;
     private final String fieldDelimiter;
@@ -132,7 +132,8 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
         isFirstWrite.clear();
     }
 
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+    @Override
+    public FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
         FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 24b23c9bfc..25b84714d4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -27,11 +27,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.List;
 
-public interface WriteStrategy extends Transaction, Serializable, Closeable {
+public interface WriteStrategy<T> extends Transaction, Serializable, Closeable 
{
     /**
      * init hadoop conf
      *
@@ -70,6 +71,8 @@ public interface WriteStrategy extends Transaction, 
Serializable, Closeable {
      */
     LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow);
 
+    T getOrCreateOutputStream(String path) throws IOException;
+
     /**
      * use transaction id generate file name
      *
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
index 74fa220031..adcc57b3dc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/XmlWriteStrategy.java
@@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
  * ensures that each file is written to only once. It writes the data by 
passing the data row to the
  * corresponding XmlWriter instance.
  */
-public class XmlWriteStrategy extends AbstractWriteStrategy {
+public class XmlWriteStrategy extends AbstractWriteStrategy<XmlWriter> {
 
     private final LinkedHashMap<String, XmlWriter> beingWrittenWriter;
 
@@ -48,7 +48,7 @@ public class XmlWriteStrategy extends AbstractWriteStrategy {
     public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException 
{
         super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        XmlWriter xmlDocWriter = getOrCreateXmlWriter(filePath);
+        XmlWriter xmlDocWriter = getOrCreateOutputStream(filePath);
         xmlDocWriter.writeData(seaTunnelRow);
     }
 
@@ -70,7 +70,8 @@ public class XmlWriteStrategy extends AbstractWriteStrategy {
         this.beingWrittenWriter.clear();
     }
 
-    private XmlWriter getOrCreateXmlWriter(String filePath) {
+    @Override
+    public XmlWriter getOrCreateOutputStream(String filePath) {
         return beingWrittenWriter.computeIfAbsent(
                 filePath,
                 k -> new XmlWriter(fileSinkConfig, sinkColumnsIndexInRow, 
seaTunnelRowType));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
index b728af4951..b7b6185498 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -93,6 +93,7 @@ public class CosFileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 3dc48bd3bb..0a9dfcb3d7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -107,6 +107,7 @@ public class FtpFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index b937a8bed0..1f5d98b0be 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -95,6 +95,7 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH)
                 .optional(BaseSinkConfig.KRB5_PATH)
                 .optional(BaseSinkConfig.REMOTE_USER)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
index 7ecbb6c3f1..2064c2937e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
@@ -93,6 +93,7 @@ public class OssFileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 8450f13999..c6e75615be 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -100,6 +100,7 @@ public class LocalFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
index 8280ec5e2a..f1fffa4ece 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.FileUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.sink.LocalFileSinkFactory;
 import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
 
@@ -33,7 +34,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -153,4 +156,78 @@ public class LocalFileTest {
                         FileUtils.getFileLineNumber(
                                 
"/tmp/seatunnel/LocalFileTest/only_one_file_1.txt"));
     }
+
+    @Test
+    void testCreateEmptyFileWhenNoData() throws IOException {
+        Map<String, Object> options =
+                new HashMap<String, Object>() {
+                    {
+                        put("path", "/tmp/seatunnel/LocalFileTest");
+                        put("row_delimiter", "\n");
+                        put("file_name_expression", "empty_file");
+                        put("is_enable_transaction", false);
+                        put("batch_size", 1);
+                        put("create_empty_file_when_no_data", true);
+                    }
+                };
+        options.put("file_format_type", "text");
+        FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable,
+                ReadonlyConfig.fromMap(options),
+                new LocalFileSinkFactory(),
+                Collections.emptyList());
+        Assertions.assertEquals(
+                0,
+                (long)
+                        FileUtils.getFileLineNumber(
+                                
"/tmp/seatunnel/LocalFileTest/empty_file_0.txt"));
+
+        options.put("file_format_type", "csv");
+        FileUtils.deleteFile("/tmp/seatunnel/LocalFileTest");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable,
+                ReadonlyConfig.fromMap(options),
+                new LocalFileSinkFactory(),
+                Collections.emptyList());
+        Assertions.assertEquals(
+                0,
+                (long)
+                        FileUtils.getFileLineNumber(
+                                
"/tmp/seatunnel/LocalFileTest/empty_file_0.csv"));
+
+        options.put("enable_header_write", true);
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable,
+                ReadonlyConfig.fromMap(options),
+                new LocalFileSinkFactory(),
+                Collections.emptyList());
+        Assertions.assertEquals(
+                "test\n",
+                FileUtils.readFileToStr(
+                        
Paths.get("/tmp/seatunnel/LocalFileTest/empty_file_0.csv")));
+
+        options.put("file_format_type", "parquet");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable,
+                ReadonlyConfig.fromMap(options),
+                new LocalFileSinkFactory(),
+                Collections.emptyList());
+        Assertions.assertEquals(
+                300, new 
File("/tmp/seatunnel/LocalFileTest/empty_file_0.parquet").length());
+
+        options.put("file_format_type", "binary");
+        FileConnectorException exception =
+                Assertions.assertThrows(
+                        FileConnectorException.class,
+                        () ->
+                                
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                                        catalogTable,
+                                        ReadonlyConfig.fromMap(options),
+                                        new LocalFileSinkFactory(),
+                                        Collections.emptyList()));
+        Assertions.assertEquals(
+                "ErrorCode:[FILE-07], ErrorDescription:[Format not support] - 
BinaryWriteStrategy does not support generating empty files when no data is 
written.",
+                exception.getMessage());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 246c769b76..82a062faf1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -105,6 +105,7 @@ public class OssFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 950582a860..492605b874 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -105,6 +105,7 @@ public class S3FileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .optional(BaseSinkConfig.TMP_PATH)
                 .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index 4ff9c6928f..4ab3d18651 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -105,6 +105,7 @@ public class SftpFileSinkFactory extends 
BaseMultipleTableFileSinkFactory {
                 .optional(BaseSinkConfig.TIME_FORMAT)
                 .optional(BaseSinkConfig.SINGLE_FILE_MODE)
                 .optional(BaseSinkConfig.BATCH_SIZE)
+                .optional(BaseSinkConfig.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 .build();
     }
 

Reply via email to