This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 2dbf02df47 [Feature][File] Support config null format for text file read (#8109) 2dbf02df47 is described below commit 2dbf02df47617d4dad6f970fd03952a78c884dfd Author: hailin0 <wanghai...@apache.org> AuthorDate: Wed Nov 27 12:34:46 2024 +0800 [Feature][File] Support config null format for text file read (#8109) --- docs/en/connector-v2/source/FtpFile.md | 10 ++- docs/en/connector-v2/source/HdfsFile.md | 1 + docs/en/connector-v2/source/LocalFile.md | 8 ++ docs/en/connector-v2/source/OssFile.md | 1 + docs/en/connector-v2/source/OssJindoFile.md | 8 ++ docs/en/connector-v2/source/S3File.md | 1 + docs/en/connector-v2/source/SftpFile.md | 3 +- docs/zh/connector-v2/source/HdfsFile.md | 1 + .../assertion/excecutor/AssertExecutor.java | 10 +-- .../file/config/BaseSourceConfigOptions.java | 6 ++ .../file/source/reader/TextReadStrategy.java | 19 +++-- .../file/cos/source/CosFileSourceFactory.java | 1 + .../file/ftp/source/FtpFileSourceFactory.java | 1 + .../file/hdfs/source/HdfsFileSourceFactory.java | 1 + .../oss/jindo/source/OssFileSourceFactory.java | 1 + .../file/local/source/LocalFileSourceFactory.java | 1 + .../file/obs/source/ObsFileSourceFactory.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 1 + .../file/s3/source/S3FileSourceFactory.java | 1 + .../file/sftp/source/SftpFileSourceFactory.java | 1 + .../hive/source/config/HiveSourceConfig.java | 15 ++++ .../e2e/connector/file/local/LocalFileIT.java | 7 ++ .../src/test/resources/text/e2e_null_format.txt | 3 + .../text/local_file_null_format_assert.conf | 95 ++++++++++++++++++++++ .../format/text/TextDeserializationSchema.java | 25 +++++- 25 files changed, 208 insertions(+), 14 deletions(-) diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index 59f3852cb0..f65255bfd7 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -38,7 +38,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you ## Options -| name | type | required | default value | +| name | type | required | default value | |---------------------------|---------|----------|---------------------| | host | string | yes | - | | port | int | yes | - | @@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | compress_codec | string | no | none | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | +| null_format | string | no | - | | common-options | | no | - | ### host [string] @@ -336,6 +337,13 @@ The compress codec of archive files and the details that supported as the follow Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### null_format [string] + +Only used when file_format_type is text. +null_format to define which strings can be represented as null. + +e.g: `\N` + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 161b0e6318..caaf9972a0 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -66,6 +66,7 @@ Read data from hdfs file system. | compress_codec | string | no | none | The compress codec of files | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | | +| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ### delimiter/field_delimiter [string] diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 8923a03160..477a4d4139 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | compress_codec | string | no | none | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | +| null_format | string | no | - | | common-options | | no | - | | tables_configs | list | no | used to define a multiple table task | @@ -330,6 +331,13 @@ The compress codec of archive files and the details that supported as the follow Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### null_format [string] + +Only used when file_format_type is text. +null_format to define which strings can be represented as null. + +e.g: `\N` + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index ae2686b7e8..42163a9d13 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -211,6 +211,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | | compress_codec | string | no | none | Which compress codec the files used. | | encoding | string | no | UTF-8 | +| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. | | common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 1db5d62a44..9b83a0b050 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -72,6 +72,7 @@ It only supports hadoop version **2.9.X+**. | compress_codec | string | no | none | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | +| null_format | string | no | - | | common-options | | no | - | ### path [string] @@ -343,6 +344,13 @@ The compress codec of archive files and the details that supported as the follow Only used when file_format_type is json,text,csv,xml. The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`. +### null_format [string] + +Only used when file_format_type is text. +null_format to define which strings can be represented as null. + +e.g: `\N` + ### common options Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index ba4b71cfe9..b0e69cd1e3 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -220,6 +220,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | compress_codec | string | no | none | [...] | archive_compress_codec | string | no | none | [...] | encoding | string | no | UTF-8 | [...] +| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` [...] | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. [...] | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. [...] diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index c6e4aa6c84..f5b76ce305 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -71,7 +71,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel ## Source Options -| Name | Type | Required | default value | Description | +| Name | Type | Required | default value | Description | |---------------------------|---------|----------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | host | String | Yes | - | The target sftp host is required | | port | Int | Yes | - | The target sftp port is required | @@ -94,6 +94,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel | compress_codec | String | No | None | The compress codec of files and the details that supported as the following shown: <br/> - txt: `lzo` `None` <br/> - json: `lzo` `None` <br/> - csv: `lzo` `None` <br/> - orc: `lzo` `snappy` `lz4` `zlib` `None` <br/> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None` <br/> Tips: excel type does Not support any compression format | | archive_compress_codec | string | no | none | | encoding | string | no | UTF-8 | +| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ### file_filter_pattern [string] diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md index 9cd254ef80..4de3014f5c 100644 --- a/docs/zh/connector-v2/source/HdfsFile.md +++ b/docs/zh/connector-v2/source/HdfsFile.md @@ -56,6 +56,7 @@ | kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径。 | | skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。 | | file_filter_pattern | string | 否 | - | 过滤模式,用于过滤文件。 | +| null_format | string | 否 | - | 定义哪些字符串可以表示为 null,但仅适用于 txt 和 csv. 例如: `\N` | | schema | config | 否 | - | 上游数据的模式字段。 | | sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 | | compress_codec | string | 否 | none | 文件的压缩编解码器。 | diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java index f6908b989d..33193cb314 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java @@ -285,11 +285,11 @@ public class AssertExecutor { private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) { if (value == null) { - if (fieldType.getSqlType() == SqlType.NULL) { - return true; - } else { - return false; - } + return true; + } + + if (fieldType.getSqlType() == SqlType.NULL) { + return false; } if (fieldType.getSqlType() == SqlType.ROW) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java index de45726e3c..dedddeacbb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java @@ -48,6 +48,12 @@ public class BaseSourceConfigOptions { .withDescription( "The separator between columns in a row of data. Only needed by `text` file format"); + public static final Option<String> NULL_FORMAT = + Options.key("null_format") + .stringType() + .noDefaultValue() + .withDescription("The string that represents a null value"); + public static final Option<String> ENCODING = Options.key("encoding") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index 1a7a7398a4..b13aad765a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -157,10 +157,15 @@ public class TextReadStrategy extends AbstractReadStrategy { "When reading json/text/csv files, if user has not specified schema information, " + "SeaTunnel will not support column projection"); } + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder() .delimiter(TextFormatConstant.PLACEHOLDER) - .textLineSplitor(textLineSplitor); + .textLineSplitor(textLineSplitor) + .nullFormat( + readonlyConfig + .getOptional(BaseSourceConfigOptions.NULL_FORMAT) + .orElse(null)); if (isMergePartition) { deserializationSchema = builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build(); @@ -175,11 +180,11 @@ public class TextReadStrategy extends AbstractReadStrategy { SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); SeaTunnelRowType userDefinedRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), rowType); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); Optional<String> fieldDelimiterOptional = - ReadonlyConfig.fromConfig(pluginConfig) - .getOptional(BaseSourceConfigOptions.FIELD_DELIMITER); + readonlyConfig.getOptional(BaseSourceConfigOptions.FIELD_DELIMITER); encoding = - ReadonlyConfig.fromConfig(pluginConfig) + readonlyConfig .getOptional(BaseSourceConfigOptions.ENCODING) .orElse(StandardCharsets.UTF_8.name()); if (fieldDelimiterOptional.isPresent()) { @@ -198,7 +203,11 @@ public class TextReadStrategy extends AbstractReadStrategy { TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder() .delimiter(fieldDelimiter) - .textLineSplitor(textLineSplitor); + .textLineSplitor(textLineSplitor) + .nullFormat( + readonlyConfig + .getOptional(BaseSourceConfigOptions.NULL_FORMAT) + .orElse(null)); if (isMergePartition) { deserializationSchema = builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build(); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java index 388d245047..ab519d299d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java @@ -72,6 +72,7 @@ public class CosFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index 9bb4b98e05..3411e9c407 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -83,6 +83,7 @@ public class FtpFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(FtpConfigOptions.FTP_CONNECTION_MODE) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java index 88e4684180..f09dba2c1f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java @@ -69,6 +69,7 @@ public class HdfsFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java index a6c9276c76..f195ac3071 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java @@ -71,6 +71,7 @@ public class OssFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.TIME_FORMAT) .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java index 0d58e506da..1c66cc6f8b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java @@ -78,6 +78,7 @@ public class LocalFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.TIME_FORMAT) .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java index e1cd0ee97b..586718b641 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java @@ -60,6 +60,7 @@ public class ObsFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.DATE_FORMAT) .optional(BaseSourceConfigOptions.DATETIME_FORMAT) .optional(BaseSourceConfigOptions.TIME_FORMAT) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index 6f140330cc..9e70249be7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -83,6 +83,7 @@ public class OssFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index d1107d46cf..a3376e745e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -87,6 +87,7 @@ public class S3FileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index b4d1d1c63f..8e5fd9ade8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -76,6 +76,7 @@ public class SftpFileSourceFactory implements TableSourceFactory { .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC) + .optional(BaseSourceConfigOptions.NULL_FORMAT) .build(); } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java index eba9b5a15b..dd3d9478c1 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java @@ -46,6 +46,7 @@ import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils; import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; @@ -63,6 +64,7 @@ import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE; import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.NULL_FORMAT; @Getter public class HiveSourceConfig implements Serializable { @@ -122,6 +124,19 @@ public class HiveSourceConfig implements Serializable { case TEXT: // if the file format is text, we set the delim. Map<String, String> parameters = table.getSd().getSerdeInfo().getParameters(); + if (!readonlyConfig.getOptional(NULL_FORMAT).isPresent()) { + String nullFormatKey = "serialization.null.format"; + String nullFormat = table.getParameters().get(nullFormatKey); + if (StringUtils.isEmpty(nullFormat)) { + nullFormat = parameters.get(nullFormatKey); + } + if (StringUtils.isEmpty(nullFormat)) { + nullFormat = "\\N"; + } + config = + config.withValue( + NULL_FORMAT.key(), ConfigValueFactory.fromAnyRef(nullFormat)); + } config = config.withValue( FIELD_DELIMITER.key(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index ed055a3a30..87e3483df4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -268,6 +268,12 @@ public class LocalFileIT extends TestSuiteBase { "/excel/e2e.xlsx", "/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", container); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e_null_format.txt", + "/seatunnel/read/e2e_null_format/e2e_null_format.txt", + container); + container.execInContainer("mkdir", "-p", "/tmp/fake_empty"); }; @@ -293,6 +299,7 @@ public class LocalFileIT extends TestSuiteBase { helper.execute("/text/fake_to_local_file_with_encoding.conf"); // test read local csv file with assigning encoding helper.execute("/text/local_file_text_to_console_with_encoding.conf"); + helper.execute("/text/local_file_null_format_assert.conf"); // test write local json file helper.execute("/json/fake_to_local_file_json.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt new file mode 100644 index 0000000000..e5c0c117e5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt @@ -0,0 +1,3 @@ +1,a,a,1 +2,a,a,1 +3,a,a,1 \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf new file mode 100644 index 0000000000..a70f83a737 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + LocalFile { + path = "/seatunnel/read/e2e_null_format" + file_format_type = "text" + delimiter = "," + null_format = "a" + schema = { + fields { + f1 = bigint + f2 = bigint + f3 = string + f4 = bigint + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 3 + } + ], + field_rules = [ + { + field_name = f1 + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = f2 + field_type = bigint + field_value = [ + { + rule_type = NULL + } + ] + }, + { + field_name = f3 + field_type = string + field_value = [ + { + rule_type = NULL + } + ] + }, + { + field_name = f4 + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} + diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java index 8c06a0e68c..16c4bb93ea 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java @@ -61,6 +61,7 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne private final SeaTunnelRowType seaTunnelRowType; private final String[] separators; private final String encoding; + private final String nullFormat; private final TextLineSplitor splitor; private final CatalogTable catalogTable; @@ -77,11 +78,13 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne @NonNull SeaTunnelRowType seaTunnelRowType, String[] separators, String encoding, + String nullFormat, TextLineSplitor splitor, CatalogTable catalogTable) { this.seaTunnelRowType = seaTunnelRowType; this.separators = separators; this.encoding = encoding; + this.nullFormat = nullFormat; this.splitor = splitor; this.catalogTable = catalogTable; } @@ -99,6 +102,7 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; private String encoding = StandardCharsets.UTF_8.name(); + private String nullFormat; private TextLineSplitor textLineSplitor = new DefaultTextLineSplitor(); private Builder() {} @@ -143,6 +147,11 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne return this; } + public Builder nullFormat(String nullFormat) { + this.nullFormat = nullFormat; + return this; + } + public Builder textLineSplitor(TextLineSplitor splitor) { this.textLineSplitor = splitor; return this; @@ -150,7 +159,12 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne public TextDeserializationSchema build() { return new TextDeserializationSchema( - seaTunnelRowType, separators, encoding, textLineSplitor, catalogTable); + seaTunnelRowType, + separators, + encoding, + nullFormat, + textLineSplitor, + catalogTable); } } @@ -163,9 +177,16 @@ public class TextDeserializationSchema implements DeserializationSchema<SeaTunne Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType, 0); Object[] objects = new Object[seaTunnelRowType.getTotalFields()]; for (int i = 0; i < objects.length; i++) { + String fieldValue = splitsMap.get(i); + if (StringUtils.isBlank(fieldValue)) { + continue; + } + if (StringUtils.equals(fieldValue, nullFormat)) { + continue; + } objects[i] = convert( - splitsMap.get(i), + fieldValue, seaTunnelRowType.getFieldType(i), 0, seaTunnelRowType.getFieldNames()[i]);