This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2dbf02df47 [Feature][File] Support config null format for text file 
read (#8109)
2dbf02df47 is described below

commit 2dbf02df47617d4dad6f970fd03952a78c884dfd
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Wed Nov 27 12:34:46 2024 +0800

    [Feature][File] Support config null format for text file read (#8109)
---
 docs/en/connector-v2/source/FtpFile.md             | 10 ++-
 docs/en/connector-v2/source/HdfsFile.md            |  1 +
 docs/en/connector-v2/source/LocalFile.md           |  8 ++
 docs/en/connector-v2/source/OssFile.md             |  1 +
 docs/en/connector-v2/source/OssJindoFile.md        |  8 ++
 docs/en/connector-v2/source/S3File.md              |  1 +
 docs/en/connector-v2/source/SftpFile.md            |  3 +-
 docs/zh/connector-v2/source/HdfsFile.md            |  1 +
 .../assertion/excecutor/AssertExecutor.java        | 10 +--
 .../file/config/BaseSourceConfigOptions.java       |  6 ++
 .../file/source/reader/TextReadStrategy.java       | 19 +++--
 .../file/cos/source/CosFileSourceFactory.java      |  1 +
 .../file/ftp/source/FtpFileSourceFactory.java      |  1 +
 .../file/hdfs/source/HdfsFileSourceFactory.java    |  1 +
 .../oss/jindo/source/OssFileSourceFactory.java     |  1 +
 .../file/local/source/LocalFileSourceFactory.java  |  1 +
 .../file/obs/source/ObsFileSourceFactory.java      |  1 +
 .../file/oss/source/OssFileSourceFactory.java      |  1 +
 .../file/s3/source/S3FileSourceFactory.java        |  1 +
 .../file/sftp/source/SftpFileSourceFactory.java    |  1 +
 .../hive/source/config/HiveSourceConfig.java       | 15 ++++
 .../e2e/connector/file/local/LocalFileIT.java      |  7 ++
 .../src/test/resources/text/e2e_null_format.txt    |  3 +
 .../text/local_file_null_format_assert.conf        | 95 ++++++++++++++++++++++
 .../format/text/TextDeserializationSchema.java     | 25 +++++-
 25 files changed, 208 insertions(+), 14 deletions(-)

diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index 59f3852cb0..f65255bfd7 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -38,7 +38,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 
 ## Options
 
-|           name            |  type   | required |    default value    |
+| name                      | type    | required | default value       |
 |---------------------------|---------|----------|---------------------|
 | host                      | string  | yes      | -                   |
 | port                      | int     | yes      | -                   |
@@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | compress_codec            | string  | no       | none                |
 | archive_compress_codec    | string  | no       | none                |
 | encoding                  | string  | no       | UTF-8               |
+| null_format               | string  | no       | -                   |
 | common-options            |         | no       | -                   |
 
 ### host [string]
@@ -336,6 +337,13 @@ The compress codec of archive files and the details that 
supported as the follow
 Only used when file_format_type is json,text,csv,xml.
 The encoding of the file to read. This param will be parsed by 
`Charset.forName(encoding)`.
 
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.
diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index 161b0e6318..caaf9972a0 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -66,6 +66,7 @@ Read data from hdfs file system.
 | compress_codec            | string  | no       | none                | The 
compress codec of files                                                         
                                                                                
                                                                                
                                                                                
          |
 | archive_compress_codec    | string  | no       | none                |
 | encoding                  | string  | no       | UTF-8               |       
                                                                                
                                                                                
                                                                                
                                                                                
        |
+| null_format               | string  | no       | -                   | Only 
used when file_format_type is text. null_format to define which strings can be 
represented as null. e.g: `\N`                                                  
                                                                                
                                                                                
          |
 | common-options            |         | no       | -                   | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                              |
 
 ### delimiter/field_delimiter [string]
diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index 8923a03160..477a4d4139 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | compress_codec            | string  | no       | none                        
         |
 | archive_compress_codec    | string  | no       | none                        
         |
 | encoding                  | string  | no       | UTF-8                       
         |
+| null_format               | string  | no       | -                           
         | 
 | common-options            |         | no       | -                           
         |
 | tables_configs            | list    | no       | used to define a multiple 
table task |
 
@@ -330,6 +331,13 @@ The compress codec of archive files and the details that 
supported as the follow
 Only used when file_format_type is json,text,csv,xml.
 The encoding of the file to read. This param will be parsed by 
`Charset.forName(encoding)`.
 
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details
diff --git a/docs/en/connector-v2/source/OssFile.md 
b/docs/en/connector-v2/source/OssFile.md
index ae2686b7e8..42163a9d13 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -211,6 +211,7 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | xml_use_attr_format       | boolean | no       | -                   | 
Specifies whether to process data using the tag attribute format, only used 
when file_format is xml.                                                        
                                                                                
                                                                                
        |
 | compress_codec            | string  | no       | none                | Which 
compress codec the files used.                                                  
                                                                                
                                                                                
                                                                              |
 | encoding                  | string  | no       | UTF-8               |
+| null_format               | string  | no       | -                   | Only 
used when file_format_type is text. null_format to define which strings can be 
represented as null. e.g: `\N`                                                  
                                                                                
                                                                                
|
 | file_filter_pattern       | string  | no       |                     | 
Filter pattern, which used for filtering files.                                 
                                                                                
                                                                                
                                                                                
    |
 | common-options            | config  | no       | -                   | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                    |
 
diff --git a/docs/en/connector-v2/source/OssJindoFile.md 
b/docs/en/connector-v2/source/OssJindoFile.md
index 1db5d62a44..9b83a0b050 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -72,6 +72,7 @@ It only supports hadoop version **2.9.X+**.
 | compress_codec            | string  | no       | none                |
 | archive_compress_codec    | string  | no       | none                |
 | encoding                  | string  | no       | UTF-8               |
+| null_format               | string  | no       | -                   |
 | common-options            |         | no       | -                   |
 
 ### path [string]
@@ -343,6 +344,13 @@ The compress codec of archive files and the details that 
supported as the follow
 Only used when file_format_type is json,text,csv,xml.
 The encoding of the file to read. This param will be parsed by 
`Charset.forName(encoding)`.
 
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.
diff --git a/docs/en/connector-v2/source/S3File.md 
b/docs/en/connector-v2/source/S3File.md
index ba4b71cfe9..b0e69cd1e3 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -220,6 +220,7 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | compress_codec                  | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | archive_compress_codec          | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | encoding                        | string  | no       | UTF-8                 
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| null_format                     | string  | no       | -                     
                                | Only used when file_format_type is text. 
null_format to define which strings can be represented as null. e.g: `\N`       
                                                                                
                                                                                
                                                                                
                   [...]
 | file_filter_pattern             | string  | no       |                       
                                | Filter pattern, which used for filtering 
files.                                                                          
                                                                                
                                                                                
                                                                                
                   [...]
 | common-options                  |         | no       | -                     
                                | Source plugin common parameters, please refer 
to [Source Common Options](../source-common-options.md) for details.            
                                                                                
                                                                                
                                                                                
              [...]
 
diff --git a/docs/en/connector-v2/source/SftpFile.md 
b/docs/en/connector-v2/source/SftpFile.md
index c6e4aa6c84..f5b76ce305 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -71,7 +71,7 @@ The File does not have a specific type list, and we can 
indicate which SeaTunnel
 
 ## Source Options
 
-|           Name            |  Type   | Required |    default value    | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                |
+| Name                      | Type    | Required | default value       | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                |
 
|---------------------------|---------|----------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | host                      | String  | Yes      | -                   | The 
target sftp host is required                                                    
                                                                                
                                                                                
                                                                                
                                            |
 | port                      | Int     | Yes      | -                   | The 
target sftp port is required                                                    
                                                                                
                                                                                
                                                                                
                                            |
@@ -94,6 +94,7 @@ The File does not have a specific type list, and we can 
indicate which SeaTunnel
 | compress_codec            | String  | No       | None                | The 
compress codec of files and the details that supported as the following shown: 
<br/> - txt: `lzo` `None` <br/> - json: `lzo` `None` <br/> - csv: `lzo` `None` 
<br/> - orc: `lzo` `snappy` `lz4` `zlib` `None` <br/> - parquet: `lzo` `snappy` 
`lz4` `gzip` `brotli` `zstd` `None` <br/> Tips: excel type does Not support any 
compression format                            |
 | archive_compress_codec    | string  | no       | none                |
 | encoding                  | string  | no       | UTF-8               |
+| null_format               | string  | no       | -                   | Only 
used when file_format_type is text. null_format to define which strings can be 
represented as null. e.g: `\N`                                                  
                                                                                
                                                                                
                                            |
 | common-options            |         | No       | -                   | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                                                                |
 
 ### file_filter_pattern [string]
diff --git a/docs/zh/connector-v2/source/HdfsFile.md 
b/docs/zh/connector-v2/source/HdfsFile.md
index 9cd254ef80..4de3014f5c 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -56,6 +56,7 @@
 | kerberos_keytab_path      | string  | 否    | -              | kerberos 的 
keytab 路径。                                                                      
                                                                                
                                                                 |
 | skip_header_row_number    | long    | 否    | 0              | 跳过前几行,但仅适用于 
txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。        
                                                                                
                                                                |
 | file_filter_pattern       | string  | 否    | -              | 过滤模式,用于过滤文件。   
                                                                                
                                                                                
                                                             |
+| null_format               | string  | 否    | -              | 定义哪些字符串可以表示为 
null,但仅适用于 txt 和 csv. 例如: `\N`                                                  
                                                                                
                                                               |
 | schema                    | config  | 否    | -              | 上游数据的模式字段。     
                                                                                
                                                                                
                                                             |
 | sheet_name                | string  | 否    | -              | 
读取工作簿的表格,仅在文件格式为 excel 时使用。                                                     
                                                                                
                                                                            |
 | compress_codec            | string  | 否    | none           | 文件的压缩编解码器。     
                                                                                
                                                                                
                                                             |
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index f6908b989d..33193cb314 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -285,11 +285,11 @@ public class AssertExecutor {
 
     private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
         if (value == null) {
-            if (fieldType.getSqlType() == SqlType.NULL) {
-                return true;
-            } else {
-                return false;
-            }
+            return true;
+        }
+
+        if (fieldType.getSqlType() == SqlType.NULL) {
+            return false;
         }
 
         if (fieldType.getSqlType() == SqlType.ROW) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index de45726e3c..dedddeacbb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -48,6 +48,12 @@ public class BaseSourceConfigOptions {
                     .withDescription(
                             "The separator between columns in a row of data. 
Only needed by `text` file format");
 
+    public static final Option<String> NULL_FORMAT =
+            Options.key("null_format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The string that represents a null 
value");
+
     public static final Option<String> ENCODING =
             Options.key("encoding")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 1a7a7398a4..b13aad765a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -157,10 +157,15 @@ public class TextReadStrategy extends 
AbstractReadStrategy {
                     "When reading json/text/csv files, if user has not 
specified schema information, "
                             + "SeaTunnel will not support column projection");
         }
+        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
         TextDeserializationSchema.Builder builder =
                 TextDeserializationSchema.builder()
                         .delimiter(TextFormatConstant.PLACEHOLDER)
-                        .textLineSplitor(textLineSplitor);
+                        .textLineSplitor(textLineSplitor)
+                        .nullFormat(
+                                readonlyConfig
+                                        
.getOptional(BaseSourceConfigOptions.NULL_FORMAT)
+                                        .orElse(null));
         if (isMergePartition) {
             deserializationSchema =
                     
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
@@ -175,11 +180,11 @@ public class TextReadStrategy extends 
AbstractReadStrategy {
         SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
         SeaTunnelRowType userDefinedRowTypeWithPartition =
                 mergePartitionTypes(fileNames.get(0), rowType);
+        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
         Optional<String> fieldDelimiterOptional =
-                ReadonlyConfig.fromConfig(pluginConfig)
-                        .getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
+                
readonlyConfig.getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
         encoding =
-                ReadonlyConfig.fromConfig(pluginConfig)
+                readonlyConfig
                         .getOptional(BaseSourceConfigOptions.ENCODING)
                         .orElse(StandardCharsets.UTF_8.name());
         if (fieldDelimiterOptional.isPresent()) {
@@ -198,7 +203,11 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
         TextDeserializationSchema.Builder builder =
                 TextDeserializationSchema.builder()
                         .delimiter(fieldDelimiter)
-                        .textLineSplitor(textLineSplitor);
+                        .textLineSplitor(textLineSplitor)
+                        .nullFormat(
+                                readonlyConfig
+                                        
.getOptional(BaseSourceConfigOptions.NULL_FORMAT)
+                                        .orElse(null));
         if (isMergePartition) {
             deserializationSchema =
                     
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index 388d245047..ab519d299d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -72,6 +72,7 @@ public class CosFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 9bb4b98e05..3411e9c407 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -83,6 +83,7 @@ public class FtpFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 88e4684180..f09dba2c1f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -69,6 +69,7 @@ public class HdfsFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
index a6c9276c76..f195ac3071 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
@@ -71,6 +71,7 @@ public class OssFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.TIME_FORMAT)
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 0d58e506da..1c66cc6f8b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -78,6 +78,7 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.TIME_FORMAT)
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
index e1cd0ee97b..586718b641 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
@@ -60,6 +60,7 @@ public class ObsFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.DATE_FORMAT)
                 .optional(BaseSourceConfigOptions.DATETIME_FORMAT)
                 .optional(BaseSourceConfigOptions.TIME_FORMAT)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 6f140330cc..9e70249be7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -83,6 +83,7 @@ public class OssFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index d1107d46cf..a3376e745e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -87,6 +87,7 @@ public class S3FileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index b4d1d1c63f..8e5fd9ade8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -76,6 +76,7 @@ public class SftpFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
                 .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+                .optional(BaseSourceConfigOptions.NULL_FORMAT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index eba9b5a15b..dd3d9478c1 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -46,6 +46,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 
@@ -63,6 +64,7 @@ import java.util.Map;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.NULL_FORMAT;
 
 @Getter
 public class HiveSourceConfig implements Serializable {
@@ -122,6 +124,19 @@ public class HiveSourceConfig implements Serializable {
             case TEXT:
                 // if the file format is text, we set the delim.
                 Map<String, String> parameters = 
table.getSd().getSerdeInfo().getParameters();
+                if (!readonlyConfig.getOptional(NULL_FORMAT).isPresent()) {
+                    String nullFormatKey = "serialization.null.format";
+                    String nullFormat = 
table.getParameters().get(nullFormatKey);
+                    if (StringUtils.isEmpty(nullFormat)) {
+                        nullFormat = parameters.get(nullFormatKey);
+                    }
+                    if (StringUtils.isEmpty(nullFormat)) {
+                        nullFormat = "\\N";
+                    }
+                    config =
+                            config.withValue(
+                                    NULL_FORMAT.key(), 
ConfigValueFactory.fromAnyRef(nullFormat));
+                }
                 config =
                         config.withValue(
                                         FIELD_DELIMITER.key(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index ed055a3a30..87e3483df4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -268,6 +268,12 @@ public class LocalFileIT extends TestSuiteBase {
                         "/excel/e2e.xlsx",
                         
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                         container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/text/e2e_null_format.txt",
+                        "/seatunnel/read/e2e_null_format/e2e_null_format.txt",
+                        container);
+
                 container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
             };
 
@@ -293,6 +299,7 @@ public class LocalFileIT extends TestSuiteBase {
         helper.execute("/text/fake_to_local_file_with_encoding.conf");
         // test read local csv file with assigning encoding
         helper.execute("/text/local_file_text_to_console_with_encoding.conf");
+        helper.execute("/text/local_file_null_format_assert.conf");
 
         // test write local json file
         helper.execute("/json/fake_to_local_file_json.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt
new file mode 100644
index 0000000000..e5c0c117e5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt
@@ -0,0 +1,3 @@
+1,a,a,1
+2,a,a,1
+3,a,a,1
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf
new file mode 100644
index 0000000000..a70f83a737
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/e2e_null_format"
+    file_format_type = "text"
+    delimiter = ","
+    null_format = "a"
+    schema = {
+      fields {
+        f1 = bigint
+        f2 = bigint
+        f3 = string
+        f4 = bigint
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = f1
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = f2
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NULL
+            }
+          ]
+        },
+        {
+          field_name = f3
+          field_type = string
+          field_value = [
+            {
+              rule_type = NULL
+            }
+          ]
+        },
+        {
+          field_name = f4
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index 8c06a0e68c..16c4bb93ea 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -61,6 +61,7 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
     private final SeaTunnelRowType seaTunnelRowType;
     private final String[] separators;
     private final String encoding;
+    private final String nullFormat;
     private final TextLineSplitor splitor;
     private final CatalogTable catalogTable;
 
@@ -77,11 +78,13 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
             @NonNull SeaTunnelRowType seaTunnelRowType,
             String[] separators,
             String encoding,
+            String nullFormat,
             TextLineSplitor splitor,
             CatalogTable catalogTable) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.separators = separators;
         this.encoding = encoding;
+        this.nullFormat = nullFormat;
         this.splitor = splitor;
         this.catalogTable = catalogTable;
     }
@@ -99,6 +102,7 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
         private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
         private String encoding = StandardCharsets.UTF_8.name();
+        private String nullFormat;
         private TextLineSplitor textLineSplitor = new DefaultTextLineSplitor();
 
         private Builder() {}
@@ -143,6 +147,11 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
             return this;
         }
 
+        public Builder nullFormat(String nullFormat) {
+            this.nullFormat = nullFormat;
+            return this;
+        }
+
         public Builder textLineSplitor(TextLineSplitor splitor) {
             this.textLineSplitor = splitor;
             return this;
@@ -150,7 +159,12 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
 
         public TextDeserializationSchema build() {
             return new TextDeserializationSchema(
-                    seaTunnelRowType, separators, encoding, textLineSplitor, 
catalogTable);
+                    seaTunnelRowType,
+                    separators,
+                    encoding,
+                    nullFormat,
+                    textLineSplitor,
+                    catalogTable);
         }
     }
 
@@ -163,9 +177,16 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
         Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, 
seaTunnelRowType, 0);
         Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
         for (int i = 0; i < objects.length; i++) {
+            String fieldValue = splitsMap.get(i);
+            if (StringUtils.isBlank(fieldValue)) {
+                continue;
+            }
+            if (StringUtils.equals(fieldValue, nullFormat)) {
+                continue;
+            }
             objects[i] =
                     convert(
-                            splitsMap.get(i),
+                            fieldValue,
                             seaTunnelRowType.getFieldType(i),
                             0,
                             seaTunnelRowType.getFieldNames()[i]);


Reply via email to