This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 717620f54 [Imprve][Connector-V2][Hive] Support read text table &
Column projection (#4105)
717620f54 is described below
commit 717620f5423a329cee162c4cf9aa244ed1b33169
Author: Tyrantlucifer <[email protected]>
AuthorDate: Mon Feb 20 10:26:53 2023 +0800
[Imprve][Connector-V2][Hive] Support read text table & Column projection
(#4105)
* [Imprve][Connector-V2][Hive] Support read text table
* [Improve][Connector-V2][Hive] Format code with spotless
* [Improve][Connector-V2][Hive] Fix unit test
* [Improve][Connector-V2][Hive] Fix integration test
* [Improve][Connector-V2][Hive] Optimize text format
* [Improve][Connector-V2][Hive] Update e2e.txt
* [Improve][Connector-V2][Hive] Optimize kerberos authentication
* [Improve][Connector-V2][Hive] Improve config check logic for hive
connectors
* [Improve][Connector-V2][Hive] Support projection
* [Improve][Connector-V2][Hive] Add transient keyword
* [Improve][Connector-V2][Hive] Support parquet/orc column projection
* [Improve][Connector-V2][Hive] Optimize code
* [Improve][Connector-V2][Hive] Optimize code
* [Improve][Connector-V2][Hive] Support text/json/csv column projection
* [Improve][Connector-V2][Hive] Update docs
* [Improve][Connector-V2][Hive] Update release-note
* [Improve][Connector-V2][Hive] Add interface flag
* [Improve][Connector-V2][Hive] Remove useless method
* [Improve][Connector-V2][Hive] Update docs
* [Improve][Connector-V2][Hive] Update docs
---
.../connector-v2/Error-Quick-Reference-Manual.md | 1 +
docs/en/connector-v2/source/FtpFile.md | 17 ++-
docs/en/connector-v2/source/HdfsFile.md | 17 ++-
docs/en/connector-v2/source/Hive.md | 5 +
docs/en/connector-v2/source/LocalFile.md | 15 +++
docs/en/connector-v2/source/OssFile.md | 17 ++-
docs/en/connector-v2/source/OssJindoFile.md | 15 +++
docs/en/connector-v2/source/S3File.md | 17 ++-
docs/en/connector-v2/source/SftpFile.md | 16 ++-
release-note.md | 2 +
.../file/hdfs/source/BaseHdfsFileSource.java | 6 +
.../seatunnel/file/config/BaseSinkConfig.java | 3 +-
.../seatunnel/file/config/BaseSourceConfig.java | 9 +-
.../file/exception/FileConnectorErrorCode.java | 3 +-
.../seatunnel/file/sink/util/FileSystemUtils.java | 34 +++++
.../file/sink/writer/AbstractWriteStrategy.java | 28 +---
.../file/sink/writer/OrcWriteStrategy.java | 2 +-
.../file/sink/writer/ParquetWriteStrategy.java | 2 +-
.../seatunnel/file/source/BaseFileSource.java | 4 +-
.../file/source/reader/AbstractReadStrategy.java | 38 ++----
.../file/source/reader/OrcReadStrategy.java | 35 +++--
.../file/source/reader/ParquetReadStrategy.java | 28 ++--
.../file/source/reader/TextReadStrategy.java | 114 ++++++++++------
.../seatunnel/file/writer/OrcReadStrategyTest.java | 57 +++++++-
.../file/writer/ParquetReadStrategyTest.java | 90 ++++++++++++-
.../src/test/resources/hive.parquet | Bin 0 -> 3472 bytes
.../src/test/resources/test_read_orc.conf | 20 +++
.../src/test/resources/test_read_parquet.conf | 20 +++
.../src/test/resources/test_read_parquet2.conf | 20 +++
.../src/test/resources/timestamp_as_int64.parquet | Bin 0 -> 10297 bytes
.../{test.parquet => timestamp_as_int96.parquet} | Bin
.../hive/commit/HiveSinkAggregatedCommitter.java | 3 +
.../connectors/seatunnel/hive/sink/HiveSink.java | 36 +++++-
.../seatunnel/hive/source/HiveSource.java | 84 +++++++++++-
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 34 +----
.../seatunnel/kafka/source/KafkaSource.java | 3 +-
.../e2e/connector/file/local/LocalFileIT.java | 12 ++
.../src/test/resources/orc/e2e.orc | Bin 5471 -> 5730 bytes
...nf => local_file_orc_projection_to_assert.conf} | 23 +---
.../resources/orc/local_file_orc_to_assert.conf | 22 +---
.../local_file_parquet_projection_to_assert.conf} | 19 +--
.../src/test/resources/text/e2e.txt | 10 +-
.../local_file_text_projection_to_assert.conf} | 39 +++++-
.../format/text/TextDeserializationSchema.java | 144 +++++++++++++--------
.../format/text/TextSerializationSchema.java | 114 ++++++++++++++--
.../format/text/constant/TextFormatConstant.java | 28 +---
.../format/text/TextFormatSchemaTest.java | 60 ++++++---
47 files changed, 922 insertions(+), 344 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 01871328c..1c1fbb456 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -155,6 +155,7 @@ problems encountered by users.
| FILE-01 | File type is invalid | When users encounter this error
code, it means that the this file is not the format that user assigned, please
check it |
| FILE-02 | Data deserialization failed | When users encounter this error
code, it means that data from files not satisfied the schema that user
assigned, please check data from files whether is correct |
| FILE-03 | Get file list failed | When users encounter this error
code, it means that connector try to traverse the path and get file list
failed, please check file system whether is work |
+| FILE-04 | File list is empty | When users encounter this error
code, it means that the path user want to sync is empty, please check file path
|
## Doris Connector Error Codes
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index 0edf6cbc1..124fac7a0 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
@@ -37,6 +37,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| password | string | yes | - |
| path | string | yes | - |
| type | string | yes | - |
+| read_columns | list | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -124,6 +125,20 @@ then Seatunnel will skip the first 2 lines from source
files
The schema information of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### type [string]
File type, supported as the following file types:
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index e82831675..cde7be449 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
@@ -39,6 +39,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| path | string | yes | - |
| type | string | yes | - |
| fs.defaultFS | string | yes | - |
+| read_columns | list | yes | - |
| hdfs_site_path | string | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
@@ -219,6 +220,20 @@ The keytab path of kerberos
the schema fields of upstream data
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index fc0023539..413473c7f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -41,6 +41,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| read_partitions | list | no | - |
+| read_columns | list | no | - |
| common-options | | no | - |
### table_name [string]
@@ -70,6 +71,10 @@ The principal of kerberos authentication
The keytab file path of kerberos authentication
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index a2f15805f..6a74d18ab 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -38,6 +38,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
| type | string | yes | - |
+| read_columns | list | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -199,6 +200,20 @@ connector will generate data as the following:
The schema information of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index a822fa11a..0ca6eeb50 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -25,7 +25,7 @@ It only supports hadoop version **2.9.X+**.
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
@@ -45,6 +45,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| access_key | string | yes | - |
| access_secret | string | yes | - |
| endpoint | string | yes | - |
+| read_columns | list | yes | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| skip_header_row_number | long | no | 0 |
@@ -222,6 +223,20 @@ The endpoint of oss file system.
The schema of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index 9c966fe39..fef93da5b 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -45,6 +45,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| access_key | string | yes | - |
| access_secret | string | yes | - |
| endpoint | string | yes | - |
+| read_columns | list | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -222,6 +223,20 @@ The endpoint of oss file system.
The schema of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index 27fe779a6..65611e770 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -24,7 +24,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and
aws-java-sdk-bundle-
Read all the data in a split in a pollNext call. What splits are read will be
saved in snapshot.
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
@@ -43,6 +43,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| bucket | string | yes | -
|
| fs.s3a.endpoint | string | yes | -
|
| fs.s3a.aws.credentials.provider | string | yes |
com.amazonaws.auth.InstanceProfileCredentialsProvider |
+| read_columns | list | no | -
|
| access_key | string | no | -
|
| access_secret | string | no | -
|
| hadoop_s3_properties | map | no | -
|
@@ -239,6 +240,20 @@ hadoop_s3_properties {
The schema of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index 80d77deaf..1563874ae 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format
@@ -124,6 +124,20 @@ then Seatunnel will skip the first 2 lines from source
files
The schema information of upstream data.
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv`
files, the schema option must be configured**
+
### type [string]
File type, supported as the following file types:
diff --git a/release-note.md b/release-note.md
index d0a337566..0ed18a641 100644
--- a/release-note.md
+++ b/release-note.md
@@ -38,6 +38,8 @@
- [API]Add parallelism and column projection interface #3829
- [API]Add get source method to all source connector #3846
- [Hive] Support read user-defined partitions #3842
+- [Hive] Support read text table & Column projection #4105
+- [File] Support column projection #4105
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index c78c3fdba..70f159afe 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -78,6 +78,12 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
+ if (filePaths.isEmpty()) {
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_EMPTY,
+ "The target file list is empty,"
+ + "SeaTunnel will not be able to sync empty
table");
+ }
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 2533b10f1..43403f632 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import java.util.Arrays;
import java.util.List;
@@ -31,7 +32,7 @@ public class BaseSinkConfig {
public static final String NON_PARTITION = "NON_PARTITION";
public static final String TRANSACTION_ID_SPLIT = "_";
public static final String TRANSACTION_EXPRESSION = "transactionId";
- public static final String DEFAULT_FIELD_DELIMITER =
String.valueOf('\001');
+ public static final String DEFAULT_FIELD_DELIMITER =
TextFormatConstant.SEPARATOR[0];
public static final String DEFAULT_ROW_DELIMITER = "\n";
public static final String DEFAULT_PARTITION_DIR_EXPRESSION =
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/";
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 5b8ea6c16..747b972aa 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import java.util.List;
@@ -41,7 +42,7 @@ public class BaseSourceConfig {
public static final Option<String> DELIMITER =
Options.key("delimiter")
.stringType()
- .defaultValue(String.valueOf('\001'))
+ .defaultValue(TextFormatConstant.SEPARATOR[0])
.withDescription(
"The separator between columns in a row of data.
Only needed by `text` file format");
@@ -98,4 +99,10 @@ public class BaseSourceConfig {
.listType()
.noDefaultValue()
.withDescription("The partitions that the user want to
read");
+
+ public static final Option<List<String>> READ_COLUMNS =
+ Options.key("read_columns")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The columns list that the user want to
read");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index e966ebcd5..65e9590f3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -22,7 +22,8 @@ import
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
- FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+ FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
+ FILE_LIST_EMPTY("FILE-04", "File list is empty");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index 15b05796e..b0993f17a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -21,12 +21,14 @@ import
org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -49,12 +51,44 @@ public class FileSystemUtils implements Serializable {
this.hadoopConf = hadoopConf;
}
+ public static void doKerberosAuthentication(
+ Configuration configuration, String principal, String keytabPath) {
+ if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath))
{
+ log.warn(
+ "Principal [{}] or keytabPath [{}] is empty, it will skip
kerberos authentication",
+ principal,
+ keytabPath);
+ } else {
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info(
+ "Start Kerberos authentication using principal {} and
keytab {}",
+ principal,
+ keytabPath);
+ UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
+ log.info("Kerberos authentication successful");
+ } catch (IOException e) {
+ String errorMsg =
+ String.format(
+ "Kerberos authentication failed using this "
+ + "principal [%s] and keytab path
[%s]",
+ principal, keytabPath);
+ throw new FileConnectorException(
+ CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg,
e);
+ }
+ }
+ }
+
public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
hadoopConf.setExtraOptionsForConfiguration(configuration);
+ String principal = hadoopConf.getKerberosPrincipal();
+ String keytabPath = hadoopConf.getKerberosKeytabPath();
+ doKerberosAuthentication(configuration, principal, keytabPath);
return configuration;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index d0078f18b..6820d28d8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -37,7 +37,6 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,31 +153,8 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
String principal = hadoopConf.getKerberosPrincipal();
String keytabPath = hadoopConf.getKerberosKeytabPath();
- if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
- // kerberos authentication and only once
- if (StringUtils.isBlank(keytabPath)) {
- throw new FileConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
- "Kerberos keytab path is blank, please check this
parameter that in your config file");
- }
- configuration.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(configuration);
- try {
- log.info(
- "Start Kerberos authentication using principal {} and
keytab {}",
- principal,
- keytabPath);
- UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
- log.info("Kerberos authentication successful");
- } catch (IOException e) {
- String errorMsg =
- String.format(
- "Kerberos authentication failed using this "
- + "principal [%s] and keytab path
[%s]",
- principal, keytabPath);
- throw new FileConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg,
e);
- }
+ if (!isKerberosAuthorization) {
+ FileSystemUtils.doKerberosAuthentication(configuration, principal,
keytabPath);
isKerberosAuthorization = true;
}
return configuration;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 75c9ae34b..551d02f5b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -135,7 +135,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy
{
return writer;
}
- private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+ public static TypeDescription buildFieldWithRowType(SeaTunnelDataType<?>
type) {
switch (type.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>)
type).getElementType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 17966c9d1..ce104da80 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -224,7 +224,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
}
@SuppressWarnings("checkstyle:MagicNumber")
- private Type seaTunnelDataType2ParquetDataType(
+ public static Type seaTunnelDataType2ParquetDataType(
String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
switch (seaTunnelDataType.getSqlType()) {
case ARRAY:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
index d0596017f..c6034f06b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -35,7 +36,8 @@ import java.util.List;
public abstract class BaseFileSource
implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit,
FileSourceState>,
- SupportParallelism {
+ SupportParallelism,
+ SupportColumnProjection {
protected SeaTunnelRowType rowType;
protected ReadStrategy readStrategy;
protected HadoopConf hadoopConf;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 35c14670c..0bd638a42 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -22,18 +22,15 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import lombok.extern.slf4j.Slf4j;
@@ -72,6 +69,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected Config pluginConfig;
protected List<String> fileNames = new ArrayList<>();
protected List<String> readPartitions = new ArrayList<>();
+ protected List<String> readColumns = new ArrayList<>();
protected boolean isMergePartition = true;
protected long skipHeaderNumber =
BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected boolean isKerberosAuthorization = false;
@@ -94,38 +92,15 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
- configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
+ configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
hadoopConf.setExtraOptionsForConfiguration(configuration);
String principal = hadoopConf.getKerberosPrincipal();
String keytabPath = hadoopConf.getKerberosKeytabPath();
- if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
- // kerberos authentication and only once
- if (StringUtils.isBlank(keytabPath)) {
- throw new FileConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
- "Kerberos keytab path is blank, please check this
parameter that in your config file");
- }
- configuration.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(configuration);
- try {
- log.info(
- "Start Kerberos authentication using principal {} and
keytab {}",
- principal,
- keytabPath);
- UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
- log.info("Kerberos authentication successful");
- } catch (IOException e) {
- String errorMsg =
- String.format(
- "Kerberos authentication failed using this "
- + "principal [%s] and keytab path
[%s]",
- principal, keytabPath);
- throw new FileConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg,
e);
- }
+ if (!isKerberosAuthorization) {
+ FileSystemUtils.doKerberosAuthentication(configuration, principal,
keytabPath);
isKerberosAuthorization = true;
}
return configuration;
@@ -187,6 +162,9 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
readPartitions.addAll(
pluginConfig.getStringList(BaseSourceConfig.READ_PARTITIONS.key()));
}
+ if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index d23d4222f..d191c3a83 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -67,6 +67,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.buildFieldWithRowType;
+
@Slf4j
public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
@@ -86,10 +88,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
OrcFile.ReaderOptions readerOptions =
OrcFile.readerOptions(configuration);
try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
- TypeDescription schema = reader.getSchema();
+ TypeDescription schema = TypeDescription.createStruct();
+ for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+ TypeDescription typeDescription =
+
buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
+ schema.addField(seaTunnelRowType.getFieldName(i),
typeDescription);
+ }
List<TypeDescription> children = schema.getChildren();
- RecordReader rows = reader.rows();
- VectorizedRowBatch rowBatch = reader.getSchema().createRowBatch();
+ RecordReader rows = reader.rows(reader.options().schema(schema));
+ VectorizedRowBatch rowBatch = schema.createRowBatch();
while (rows.nextBatch(rowBatch)) {
int num = 0;
for (int i = 0; i < rowBatch.size; i++) {
@@ -128,11 +135,23 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
Path dstDir = new Path(path);
try (Reader reader = OrcFile.createReader(dstDir, readerOptions)) {
TypeDescription schema = reader.getSchema();
- String[] fields = new String[schema.getFieldNames().size()];
- SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[schema.getFieldNames().size()];
- for (int i = 0; i < schema.getFieldNames().size(); i++) {
- fields[i] = schema.getFieldNames().get(i);
- types[i] =
orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
+ List<String> fieldNames = schema.getFieldNames();
+ if (readColumns.isEmpty()) {
+ readColumns.addAll(fieldNames);
+ }
+ String[] fields = new String[readColumns.size()];
+ SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
+ for (int i = 0; i < readColumns.size(); i++) {
+ fields[i] = readColumns.get(i);
+ int index = fieldNames.indexOf(readColumns.get(i));
+ if (index == -1) {
+ throw new FileConnectorException(
+ CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
+ String.format(
+ "Column [%s] does not exists in table
schema [%s]",
+ readColumns.get(i), String.join(",",
fieldNames)));
+ }
+ types[i] =
orcDataType2SeaTunnelDataType(schema.getChildren().get(index));
}
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 7762cbfa9..424e181f8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -78,6 +78,8 @@ public class ParquetReadStrategy extends AbstractReadStrategy
{
private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+ private int[] indexes;
+
@Override
public void read(String path, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
@@ -113,7 +115,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
fields = new Object[fieldsCount];
}
for (int i = 0; i < fieldsCount; i++) {
- Object data = record.get(i);
+ Object data = record.get(indexes[i]);
fields[i] = resolveObject(data,
seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -236,15 +238,21 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
throw new
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg, e);
}
FileMetaData fileMetaData = metadata.getFileMetaData();
- MessageType schema = fileMetaData.getSchema();
- int fieldCount = schema.getFieldCount();
- String[] fields = new String[fieldCount];
- SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
- for (int i = 0; i < fieldCount; i++) {
- fields[i] = schema.getFieldName(i);
- Type type = schema.getType(i);
- SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(type);
- types[i] = fieldType;
+ MessageType originalSchema = fileMetaData.getSchema();
+ if (readColumns.isEmpty()) {
+ for (int i = 0; i < originalSchema.getFieldCount(); i++) {
+ readColumns.add(originalSchema.getFieldName(i));
+ }
+ }
+ String[] fields = new String[readColumns.size()];
+ SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
+ indexes = new int[readColumns.size()];
+ for (int i = 0; i < readColumns.size(); i++) {
+ fields[i] = readColumns.get(i);
+ Type type = originalSchema.getType(fields[i]);
+ int fieldIndex = originalSchema.getFieldIndex(fields[i]);
+ indexes[i] = fieldIndex;
+ types[i] = parquetType2SeaTunnelType(type);
}
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index ae57d33dc..0ea1d7c41 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -31,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -44,10 +47,12 @@ import java.util.Map;
public class TextReadStrategy extends AbstractReadStrategy {
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
- private String fieldDelimiter = String.valueOf('\001');
- private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
- private DateTimeUtils.Formatter datetimeFormat =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
- private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+ private String fieldDelimiter = BaseSourceConfig.DELIMITER.defaultValue();
+ private DateUtils.Formatter dateFormat =
BaseSourceConfig.DATE_FORMAT.defaultValue();
+ private DateTimeUtils.Formatter datetimeFormat =
+ BaseSourceConfig.DATETIME_FORMAT.defaultValue();
+ private TimeUtils.Formatter timeFormat =
BaseSourceConfig.TIME_FORMAT.defaultValue();
+ private int[] indexes;
@Override
public void read(String path, Collector<SeaTunnelRow> output)
@@ -66,6 +71,22 @@ public class TextReadStrategy extends AbstractReadStrategy {
try {
SeaTunnelRow seaTunnelRow =
deserializationSchema.deserialize(line.getBytes());
+ if (!readColumns.isEmpty()) {
+ // need column projection
+ Object[] fields;
+ if (isMergePartition) {
+ fields =
+ new Object
+ [readColumns.size()
+ +
partitionsMap.size()];
+ } else {
+ fields = new
Object[readColumns.size()];
+ }
+ for (int i = 0; i < indexes.length;
i++) {
+ fields[i] =
seaTunnelRow.getField(indexes[i]);
+ }
+ seaTunnelRow = new
SeaTunnelRow(fields);
+ }
if (isMergePartition) {
int index =
seaTunnelRowType.getTotalFields();
for (String value :
partitionsMap.values()) {
@@ -89,29 +110,35 @@ public class TextReadStrategy extends AbstractReadStrategy
{
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf,
String path) {
- SeaTunnelRowType simpleSeaTunnelType =
SeaTunnelSchema.buildSimpleTextSchema();
- this.seaTunnelRowType = simpleSeaTunnelType;
+ this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), simpleSeaTunnelType);
+ mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ initFormatter();
+ if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "When reading json/text/csv files, if user has not
specified schema information, "
+ + "SeaTunnel will not support column projection");
+ }
+ TextDeserializationSchema.Builder builder =
+ TextDeserializationSchema.builder()
+ .delimiter(TextFormatConstant.PLACEHOLDER)
+ .dateFormatter(dateFormat)
+ .dateTimeFormatter(datetimeFormat)
+ .timeFormatter(timeFormat);
if (isMergePartition) {
deserializationSchema =
- TextDeserializationSchema.builder()
-
.seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
- .delimiter(String.valueOf('\002'))
- .build();
+
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
} else {
- deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(this.seaTunnelRowType)
- .delimiter(String.valueOf('\002'))
- .build();
+ deserializationSchema =
builder.seaTunnelRowType(this.seaTunnelRowType).build();
}
return getActualSeaTunnelRowTypeInfo();
}
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ SeaTunnelRowType userDefinedRowTypeWithPartition =
+ mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
fieldDelimiter =
pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
} else {
@@ -122,6 +149,40 @@ public class TextReadStrategy extends AbstractReadStrategy
{
fieldDelimiter = ",";
}
}
+ initFormatter();
+ TextDeserializationSchema.Builder builder =
+ TextDeserializationSchema.builder()
+ .delimiter(fieldDelimiter)
+ .dateFormatter(dateFormat)
+ .dateTimeFormatter(datetimeFormat)
+ .timeFormatter(timeFormat);
+ if (isMergePartition) {
+ deserializationSchema =
+
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
+ } else {
+ deserializationSchema =
builder.seaTunnelRowType(seaTunnelRowType).build();
+ }
+ // column projection
+ if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+ // get the read column index from user-defined row type
+ indexes = new int[readColumns.size()];
+ String[] fields = new String[readColumns.size()];
+ SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
+ for (int i = 0; i < indexes.length; i++) {
+ indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
+ fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
+ types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+ }
+ this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
+ this.seaTunnelRowTypeWithPartition =
+ mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
+ } else {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
+ }
+ }
+
+ private void initFormatter() {
if (pluginConfig.hasPath(BaseSourceConfig.DATE_FORMAT.key())) {
dateFormat =
DateUtils.Formatter.parse(
@@ -137,24 +198,5 @@ public class TextReadStrategy extends AbstractReadStrategy
{
TimeUtils.Formatter.parse(
pluginConfig.getString(BaseSourceConfig.TIME_FORMAT.key()));
}
- if (isMergePartition) {
- deserializationSchema =
- TextDeserializationSchema.builder()
-
.seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
- .delimiter(fieldDelimiter)
- .dateFormatter(dateFormat)
- .dateTimeFormatter(datetimeFormat)
- .timeFormatter(timeFormat)
- .build();
- } else {
- deserializationSchema =
- TextDeserializationSchema.builder()
- .seaTunnelRowType(this.seaTunnelRowType)
- .delimiter(fieldDelimiter)
- .dateFormatter(dateFormat)
- .dateTimeFormatter(datetimeFormat)
- .timeFormatter(timeFormat)
- .build();
- }
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index 26f4b267a..5d72ae2e0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -17,16 +17,23 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
@@ -34,23 +41,63 @@ public class OrcReadStrategyTest {
@Test
public void testOrcRead() throws Exception {
- URL resource = OrcReadStrategyTest.class.getResource("/test.orc");
- assert resource != null;
- String path = Paths.get(resource.toURI()).toString();
+ URL orcFile = OrcReadStrategyTest.class.getResource("/test.orc");
+ Assertions.assertNotNull(orcFile);
+ String orcFilePath = Paths.get(orcFile.toURI()).toString();
OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
orcReadStrategy.init(localConf);
TestCollector testCollector = new TestCollector();
- orcReadStrategy.read(path, testCollector);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ orcReadStrategy.getSeaTunnelRowTypeInfo(localConf,
orcFilePath);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ orcReadStrategy.read(orcFilePath, testCollector);
+ for (SeaTunnelRow row : testCollector.getRows()) {
+ Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
+ Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
+ Assertions.assertEquals(row.getField(16).getClass(),
SeaTunnelRow.class);
+ }
+ }
+
+ @Test
+ public void testOrcReadProjection() throws Exception {
+ URL orcFile = OrcReadStrategyTest.class.getResource("/test.orc");
+ URL conf =
OrcReadStrategyTest.class.getResource("/test_read_orc.conf");
+ Assertions.assertNotNull(orcFile);
+ Assertions.assertNotNull(conf);
+ String orcFilePath = Paths.get(orcFile.toURI()).toString();
+ String confPath = Paths.get(conf.toURI()).toString();
+ OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+ orcReadStrategy.init(localConf);
+ orcReadStrategy.setPluginConfig(pluginConfig);
+ TestCollector testCollector = new TestCollector();
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ orcReadStrategy.getSeaTunnelRowTypeInfo(localConf,
orcFilePath);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ orcReadStrategy.read(orcFilePath, testCollector);
+ for (SeaTunnelRow row : testCollector.getRows()) {
+ Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
+ Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
+ }
}
public static class TestCollector implements Collector<SeaTunnelRow> {
+ private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+ public List<SeaTunnelRow> getRows() {
+ return rows;
+ }
+
@SuppressWarnings("checkstyle:RegexpSingleline")
@Override
public void collect(SeaTunnelRow record) {
System.out.println(record);
- Assertions.assertEquals(record.getField(16).getClass(),
SeaTunnelRow.class);
+ rows.add(record);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 5775c3659..de1d8d932 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -17,41 +17,123 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.net.URL;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
public class ParquetReadStrategyTest {
@Test
- public void testParquetRead() throws Exception {
- URL resource =
ParquetReadStrategyTest.class.getResource("/test.parquet");
- assert resource != null;
+ public void testParquetRead1() throws Exception {
+ URL resource =
ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
+ Assertions.assertNotNull(resource);
+ String path = Paths.get(resource.toURI()).toString();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, testCollector);
+ }
+
+ @Test
+ public void testParquetRead2() throws Exception {
+ URL resource =
ParquetReadStrategyTest.class.getResource("/hive.parquet");
+ Assertions.assertNotNull(resource);
+ String path = Paths.get(resource.toURI()).toString();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, testCollector);
+ }
+
+ @Test
+ public void testParquetReadProjection1() throws Exception {
+ URL resource =
ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");
+ URL conf =
OrcReadStrategyTest.class.getResource("/test_read_parquet.conf");
+ Assertions.assertNotNull(resource);
+ Assertions.assertNotNull(conf);
String path = Paths.get(resource.toURI()).toString();
+ String confPath = Paths.get(conf.toURI()).toString();
+ Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
+ parquetReadStrategy.setPluginConfig(pluginConfig);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
- assert seaTunnelRowTypeInfo != null;
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, testCollector);
+ List<SeaTunnelRow> rows = testCollector.getRows();
+ for (SeaTunnelRow row : rows) {
+ Assertions.assertEquals(row.getField(0).getClass(), Long.class);
+ Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
+ Assertions.assertEquals(row.getField(2).getClass(), Short.class);
+ Assertions.assertEquals(row.getField(0), 40000000000L);
+ Assertions.assertEquals(row.getField(1), (byte) 1);
+ Assertions.assertEquals(row.getField(2), (short) 1);
+ }
+ }
+
+ @Test
+ public void testParquetReadProjection2() throws Exception {
+ URL resource =
ParquetReadStrategyTest.class.getResource("/hive.parquet");
+ URL conf =
OrcReadStrategyTest.class.getResource("/test_read_parquet2.conf");
+ Assertions.assertNotNull(resource);
+ Assertions.assertNotNull(conf);
+ String path = Paths.get(resource.toURI()).toString();
+ String confPath = Paths.get(conf.toURI()).toString();
+ Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ parquetReadStrategy.setPluginConfig(pluginConfig);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+ System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, testCollector);
}
public static class TestCollector implements Collector<SeaTunnelRow> {
+ private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+ public List<SeaTunnelRow> getRows() {
+ return rows;
+ }
+
@SuppressWarnings("checkstyle:RegexpSingleline")
@Override
public void collect(SeaTunnelRow record) {
System.out.println(record);
+ rows.add(record);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
new file mode 100644
index 000000000..19a08e296
Binary files /dev/null and
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
differ
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
new file mode 100644
index 000000000..2f0bcc922
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+ read_columns = [tinyint_col, boolean_col]
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
new file mode 100644
index 000000000..45d30b161
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+ read_columns = [test_bigint, test_tinyint, test_smallint]
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
new file mode 100644
index 000000000..9472c80ab
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+ read_columns = [test_array, test_map]
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
new file mode 100644
index 000000000..2c111e2bb
Binary files /dev/null and
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
differ
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int96.parquet
similarity index 100%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int96.parquet
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index ed6c6700c..7d7c271e1 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggreg
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.thrift.TException;
import lombok.extern.slf4j.Slf4j;
@@ -63,6 +64,8 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
try {
hiveMetaStore.addPartitions(dbName, tableName, partitions);
log.info("Add these partitions {}", partitions);
+ } catch (AlreadyExistsException e) {
+ log.warn("These partitions {} are already exists",
partitions);
} catch (TException e) {
log.error("Failed to add these partitions {}", partitions,
e);
errorCommitInfos.add(aggregatedCommitInfo);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 31a6a8a04..b773bc4bd 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
@@ -58,12 +57,16 @@ import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConf
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_BY;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_DIR_EXPRESSION;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.SINK_COLUMNS;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.METASTORE_URI;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TABLE_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
@AutoService(SeaTunnelSink.class)
@@ -80,8 +83,7 @@ public class HiveSink extends BaseHdfsFileSink {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig, HiveConfig.METASTORE_URI.key(),
HiveConfig.TABLE_NAME.key());
+ CheckConfigUtil.checkAllExists(pluginConfig,
METASTORE_URI.key(), TABLE_NAME.key());
if (!result.isSuccess()) {
throw new HiveConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -89,12 +91,34 @@ public class HiveSink extends BaseHdfsFileSink {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK,
result.getMsg()));
}
- if
(pluginConfig.hasPath(BaseSinkConfig.PARTITION_DIR_EXPRESSION.key())) {
+ result =
+ CheckConfigUtil.checkAtLeastOneExists(
+ pluginConfig,
+ FILE_FORMAT.key(),
+ FILE_PATH.key(),
+ FIELD_DELIMITER.key(),
+ ROW_DELIMITER.key(),
+ IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
+ PARTITION_DIR_EXPRESSION.key(),
+ HAVE_PARTITION.key(),
+ SINK_COLUMNS.key(),
+ PARTITION_BY.key());
+ if (result.isSuccess()) {
throw new HiveConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
- "Hive sink connector does not support setting %s",
- BaseSinkConfig.PARTITION_DIR_EXPRESSION.key()));
+ "Hive sink connector does not support these
setting [%s]",
+ String.join(
+ ",",
+ FILE_FORMAT.key(),
+ FILE_PATH.key(),
+ FIELD_DELIMITER.key(),
+ ROW_DELIMITER.key(),
+ IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
+ PARTITION_DIR_EXPRESSION.key(),
+ HAVE_PARTITION.key(),
+ SINK_COLUMNS.key(),
+ PARTITION_BY.key())));
}
Pair<String[], Table> tableInfo =
HiveConfig.getTableInfo(pluginConfig);
dbName = tableInfo.getLeft()[0];
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index c250b0a81..b4b5311f7 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,16 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
@@ -35,15 +40,21 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErr
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import com.google.auto.service.AutoService;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig.FILE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig.FILE_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
@@ -69,6 +80,25 @@ public class HiveSource extends BaseHdfsFileSource {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
+ result =
+ CheckConfigUtil.checkAtLeastOneExists(
+ pluginConfig,
+ SCHEMA.key(),
+ FILE_TYPE.key(),
+ FILE_PATH.key(),
+ FS_DEFAULT_NAME_KEY);
+ if (result.isSuccess()) {
+ throw new HiveConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "Hive source connector does not support these
setting [%s]",
+ String.join(
+ ",",
+ SCHEMA.key(),
+ FILE_TYPE.key(),
+ FILE_PATH.key(),
+ FS_DEFAULT_NAME_KEY)));
+ }
if (pluginConfig.hasPath(BaseSourceConfig.READ_PARTITIONS.key())) {
// verify partition list
List<String> partitionsList =
@@ -96,17 +126,26 @@ public class HiveSource extends BaseHdfsFileSource {
if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- BaseSourceConfig.FILE_TYPE.key(),
+ FILE_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+ // Build schema from hive table information
+ // Because the entrySet in typesafe config couldn't keep key-value
order
+ // So use jackson to keep key-value order
+ Map<String, Object> schema = parseSchema(tableInformation);
+ ConfigRenderOptions options = ConfigRenderOptions.concise();
+ String render = pluginConfig.root().render(options);
+ ObjectNode jsonNodes = JsonUtils.parseObject(render);
+ jsonNodes.putPOJO(SCHEMA.key(), schema);
+ pluginConfig = ConfigFactory.parseString(jsonNodes.toString());
} else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- BaseSourceConfig.FILE_TYPE.key(),
+ FILE_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- BaseSourceConfig.FILE_TYPE.key(),
+ FILE_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
} else {
throw new HiveConnectorException(
@@ -136,4 +175,43 @@ public class HiveSource extends BaseHdfsFileSource {
}
super.prepare(pluginConfig);
}
+
+ private Map<String, Object> parseSchema(Table table) {
+ LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+ LinkedHashMap<String, Object> schema = new LinkedHashMap<>();
+ List<FieldSchema> cols = table.getSd().getCols();
+ for (FieldSchema col : cols) {
+ String name = col.getName();
+ String type = col.getType();
+ fields.put(name, covertHiveTypeToSeaTunnelType(type));
+ }
+ schema.put("fields", fields);
+ return schema;
+ }
+
+ private Object covertHiveTypeToSeaTunnelType(String hiveType) {
+ if (hiveType.contains("varchar")) {
+ return SqlType.STRING;
+ }
+ if (hiveType.contains("char")) {
+ throw new HiveConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "SeaTunnel hive connector does not supported char type in
text table");
+ }
+ if (hiveType.contains("binary")) {
+ return SqlType.BYTES.name();
+ }
+ if (hiveType.contains("struct")) {
+ LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+ int start = hiveType.indexOf("<");
+ int end = hiveType.lastIndexOf(">");
+ String[] columns = hiveType.substring(start + 1, end).split(",");
+ for (String column : columns) {
+ String[] splits = column.split(":");
+ fields.put(splits[0],
covertHiveTypeToSeaTunnelType(splits[1]));
+ }
+ return fields;
+ }
+ return hiveType;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 1a1f954ae..f53384a41 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -19,26 +19,22 @@ package
org.apache.seatunnel.connectors.seatunnel.hive.utils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import java.io.IOException;
import java.util.List;
import java.util.Objects;
@@ -55,34 +51,8 @@ public class HiveMetaStoreProxy {
&&
config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
String principal =
config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key());
String keytabPath =
config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key());
- if (StringUtils.isBlank(principal) ||
StringUtils.isBlank(keytabPath)) {
- String errorMsg =
- String.format(
- "Kerberos principal [%s] or keytab file path
[%s] is blank,"
- + "please check",
- principal, keytabPath);
- throw new HiveConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg);
- }
Configuration configuration = new Configuration();
- configuration.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(configuration);
- try {
- log.info(
- "Start Kerberos authentication using principal {} and
keytab {}",
- principal,
- keytabPath);
- UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
- log.info("Kerberos authentication successful");
- } catch (IOException e) {
- String errorMsg =
- String.format(
- "Kerberos authentication failed using this "
- + "principal [%s] and keytab path
[%s]",
- principal, keytabPath);
- throw new FileConnectorException(
- CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg,
e);
- }
+ FileSystemUtils.doKerberosAuthentication(configuration, principal,
keytabPath);
}
try {
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 9cc18be96..8c4fcc490 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -46,6 +46,7 @@ import
org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.kafka.common.TopicPartition;
@@ -259,7 +260,7 @@ public class KafkaSource
this.deserializationSchema =
TextDeserializationSchema.builder()
.seaTunnelRowType(typeInfo)
- .delimiter(String.valueOf('\002'))
+ .delimiter(TextFormatConstant.PLACEHOLDER)
.build();
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index 31aed642e..a8bbdf0d1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -70,6 +70,10 @@ public class LocalFileIT extends TestSuiteBase {
Container.ExecResult textReadResult =
container.executeJob("/text/local_file_text_to_assert.conf");
Assertions.assertEquals(0, textReadResult.getExitCode());
+ // test read local text file with projection
+ Container.ExecResult textProjectionResult =
+
container.executeJob("/text/local_file_text_projection_to_assert.conf");
+ Assertions.assertEquals(0, textProjectionResult.getExitCode());
// test write local json file
Container.ExecResult jsonWriteResult =
container.executeJob("/json/fake_to_local_file_json.conf");
@@ -86,6 +90,10 @@ public class LocalFileIT extends TestSuiteBase {
Container.ExecResult orcReadResult =
container.executeJob("/orc/local_file_orc_to_assert.conf");
Assertions.assertEquals(0, orcReadResult.getExitCode());
+ // test read local orc file with projection
+ Container.ExecResult orcProjectionResult =
+
container.executeJob("/orc/local_file_orc_projection_to_assert.conf");
+ Assertions.assertEquals(0, orcProjectionResult.getExitCode());
// test write local parquet file
Container.ExecResult parquetWriteResult =
container.executeJob("/parquet/fake_to_local_file_parquet.conf");
@@ -94,5 +102,9 @@ public class LocalFileIT extends TestSuiteBase {
Container.ExecResult parquetReadResult =
container.executeJob("/parquet/local_file_parquet_to_assert.conf");
Assertions.assertEquals(0, parquetReadResult.getExitCode());
+ // test read local parquet file with projection
+ Container.ExecResult parquetProjectionResult =
+
container.executeJob("/parquet/local_file_parquet_projection_to_assert.conf");
+ Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
index 507ce3703..d50f6bb54 100644
Binary files
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
and
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
differ
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
similarity index 80%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
index d99fef31a..9cde028af 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
@@ -27,8 +27,9 @@ env {
source {
LocalFile {
- path = "/seatunnel/read/parquet"
- type = "parquet"
+ path = "/seatunnel/read/orc"
+ type = "orc"
+ read_columns = [c_string, c_boolean, c_double]
result_table_name = "fake"
}
}
@@ -69,24 +70,6 @@ sink {
rule_type = NOT_NULL
}
]
- },
- {
- field_name = name
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
- },
- {
- field_name = hobby
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
index d99fef31a..7549af6fd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
@@ -27,8 +27,8 @@ env {
source {
LocalFile {
- path = "/seatunnel/read/parquet"
- type = "parquet"
+ path = "/seatunnel/read/orc"
+ type = "orc"
result_table_name = "fake"
}
}
@@ -69,24 +69,6 @@ sink {
rule_type = NOT_NULL
}
]
- },
- {
- field_name = name
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
- },
- {
- field_name = hobby
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
similarity index 83%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
index d99fef31a..dac3515a0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
@@ -29,6 +29,7 @@ source {
LocalFile {
path = "/seatunnel/read/parquet"
type = "parquet"
+ read_columns = [c_string, c_boolean, c_double]
result_table_name = "fake"
}
}
@@ -69,24 +70,6 @@ sink {
rule_type = NOT_NULL
}
]
- },
- {
- field_name = name
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
- },
- {
- field_name = hobby
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
index 727d77044..9871cd85e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
@@ -1,5 +1,5 @@
-{"Eayvq":"YVrWJ","kegtW":"jshQP","ShRel":"abmwR","vDjnr":"gnaVl","zGERi":"nlZlS"}[1208226851,1011688192,220827945,1047769453,1137173284]ajNqtfalse1203130814396025058012143372451870729.532721E371.566752199436781E308eqGaK2022-05-0984150670027331165724.3898288671923612152022-01-10
00:52:00{"YZCPo":"saHIr","NXuEq":"lRvfb","rpZUW":"TsaaM","fySFX":"jRTjq","kxmOB":"IYcoV"}[2014870053,823563564,783454480,651161159,704692328]mMPmRtrue221632040893809116054760376851640323.1975
[...]
-{"njZoW":"GMeYm","UcdNW":"QzeUB","KEUVX":"gBDFK","HYQAp":"wrkAU","CIuxE":"xbFow"}[2031111683,1178991118,2039039138,1613817066,110479538]NJdgitrue-89290489565971423719286406107975683.2706686E385.558510322527257E305TsBfe2022-11-0226360128806482056292.3973861503683968622022-07-19
01:55:00{"OYtro":"ujYpV","CmwWw":"scFCA","sLsBv":"OWHCB","pyEBC":"IBJaJ","dcnST":"EWNUE"}[1375044625,1699399583,1568825650,1259259717,1993279080]xGYNztrue-1061370319488882359202931209018536960
[...]
-{"RHdeD":"IfOaZ","qrTJC":"kyLmx","aangN":"lAeap","ZoLNq":"BUFPs","xgRbl":"wxinc"}[164178112,1126624441,15024852,1149751089,1521818730]YvrXsfalse232515032008388530651078388625039361.6630125E381.7843554887310442E308vpGjb2022-03-2178172562452015409780.5845612134798772192022-08-13
07:35:00{"tzgLt":"kDbVh","WQjRn":"qJbFw","CkgpP":"VAefn","ugYhg":"JsunV","IiEeb":"quvGf"}[1776928121,1957734724,524685509,561366753,938508932]mkHAitrue922741247279574969724970679742218241.7999
[...]
-{"jjfbQ":"YnIXv","yENje":"ffDvT","eMpYW":"LZvRw","oheFK":"QjTUE","OULct":"xrYhB"}[104016153,1372901715,1383968877,1615642149,195294861]vMstWfalse-7625594116342148311317935807474656001.792487E381.0428995536163822E308JSzTD2022-05-2455390102538242453238.6893233870974102472022-04-02
03:07:00{"YISEv":"PRNAA","iOZbP":"RuJAC","xUpOy":"KQrHA","KDQtr":"UZwht","KRfJK":"KZpCe"}[1746626910,1211890529,1600627099,866858633,1489006903]wfDHTfalse-126224418100044198439134512658444288
[...]
-{"Bdbdl":"oODJP","KqLlu":"vyGtf","jvGPe":"yRctE","rRyXw":"BSABQ","rHvsQ":"VCUqr"}[333070022,1078936427,1553422323,979329793,1601100765]Zbmuofalse1081556163411943031919830971434823681.1692632E381.0154981418689449E308IuDpI2022-06-1518027895538483530319.4903776087281118622022-06-23
00:40:00{"cqeow":"wXwCu","nFjrN":"zWila","lTedA":"WrGgl","NPLcF":"RXUhL","EIzgS":"MFnLE"}[814534518,1778618727,2107481454,1240038762,974703010]HtbvRtrue12423258182702762422924058106246453761
[...]
\ No newline at end of file
+uDDrwsQQYONTNeUBIOnLAgunvDqLBObroRzdEdvDgRmgaeFyFH5456857591576298739157764687713794636442057612252MTDnafalse3313846190943192276641872220071936002.4798444E389.52375328387482E307vcIGF2023-06-0776258155390368615610.7646252373186602912023-05-08
16:08:51ipToEdierOAbwQfQzObWqiRhjkWYaMKdCbjurhstsWrAVlRyyR2905930362869031292782506910815576701385108050hArFutrue12631169122166306155952414159791708165.949173E372.1775762383875058E307kMlgO2023-05-20
[...]
+QIpzzZNFkLwARZDSdwdBzkegCdIRVYJnuXgxNXytAJxxaTzmDF16603816781145850255103997497062535321459349811xaTOktrue5327578191749099325840234439082792961.955231E381.5072154481920294E308GDWOu2023-05-0581449039533149712064.4515003874168475032023-07-06
22:34:11sfgxhqvOLzjdTSNcNaWfEnZqvQraSSuMPazCGhPmSrGuxggqGh111449466287130860562118177510004750271267350957FDhTstrue96247293946402921952995131535667203.3240283E384.473485404447698E307YFdwf2023-02-04294
[...]
+xVJPgVlosBlTYSkmJCqKHMXzbZkNQKInuVMZeYGhsmzUmcLyPx137745493211075991209783701051546835517166168384qcYaifalse8318050110096656524405690917018449922.9617934E371.8901064340036343E307jaKMq2023-05-1275317114043170470995.9654034735914367862023-05-18
08:09:22raGGBnHsNwMZKemkFErUbedNjSllNcKOVUGdTpXcHGSVphHsNE86377304018502081846122308810391870441519757437JCRZStrue1829974183977114228752256792969205767.9090967E371.6286963710372255E308NBHUB2023-05-0
[...]
+dBgFeTKkCfnxCljyGfNEurEzCVgwpsHgmcOfYXiQHxeeQNjQuq1961913761867016982512369059615238191571813320BTfhbfalse652666522281866957533025299230722.1456136E381.2398422714159417E308YOiwg2023-10-2433001899362876139955.7235198795513055732023-06-23
13:46:46jsvmHLHlXCGFKwuqlTwAjdMckElrmqgBWvOuuKuWxcinFZWSky19959088245502706421265289671411088181469730839vUyULtrue952655754382886132164227350822215681.9033253E381.0966562906060974E308XFeKf2023-09-1731084
[...]
+obtYzIHOTKsABVtirEKEMYUYobsYlDJcFbpQUYvGxCcKlnswEG8096984004544201585383739017658796661353001394xchcntrue853141253976762312923177914159380482.8480754E381.055208146200822E308MSkTD2023-11-2420361788179232141281.9718823433892185262023-10-25
11:47:50gdCWZMGESyarjQPopBhDwKnOyDvaUDgQOEDRCmfUAagfnDDPqV8473436731118772451890654127233667151574025969ewJzLtrue6321769209768782446484076920790579202.7134378E381.1883616449174808E308STvOu2023-10-082179
[...]
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
similarity index 68%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
index d99fef31a..9a70cf768 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
@@ -27,8 +27,43 @@ env {
source {
LocalFile {
- path = "/seatunnel/read/parquet"
- type = "parquet"
+ path = "/seatunnel/read/text"
+ type = "text"
+ read_columns = [c_string, c_boolean, c_double]
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
result_table_name = "fake"
}
}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index e2476ffe3..86f25a69b 100644
---
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.format.text;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
-
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -26,44 +24,100 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
import org.apache.commons.lang3.StringUtils;
-import lombok.Builder;
import lombok.NonNull;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
-@Builder
public class TextDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
- @NonNull private SeaTunnelRowType seaTunnelRowType;
- @NonNull private String delimiter;
- @Builder.Default private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final String[] separators;
+ private final DateUtils.Formatter dateFormatter;
+ private final DateTimeUtils.Formatter dateTimeFormatter;
+ private final TimeUtils.Formatter timeFormatter;
+
+ private TextDeserializationSchema(
+ @NonNull SeaTunnelRowType seaTunnelRowType,
+ String[] separators,
+ DateUtils.Formatter dateFormatter,
+ DateTimeUtils.Formatter dateTimeFormatter,
+ TimeUtils.Formatter timeFormatter) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.separators = separators;
+ this.dateFormatter = dateFormatter;
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.timeFormatter = timeFormatter;
+ }
- @Builder.Default
- private DateTimeUtils.Formatter dateTimeFormatter =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ public static Builder builder() {
+ return new Builder();
+ }
- @Builder.Default private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
+ public static class Builder {
+ private SeaTunnelRowType seaTunnelRowType;
+ private String[] separators = TextFormatConstant.SEPARATOR.clone();
+ private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
+ private DateTimeUtils.Formatter dateTimeFormatter =
+ DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
+
+ private Builder() {}
+
+ public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ return this;
+ }
+
+ public Builder delimiter(String delimiter) {
+ this.separators[0] = delimiter;
+ return this;
+ }
+
+ public Builder separators(String[] separators) {
+ this.separators = separators;
+ return this;
+ }
+
+ public Builder dateFormatter(DateUtils.Formatter dateFormatter) {
+ this.dateFormatter = dateFormatter;
+ return this;
+ }
+
+ public Builder dateTimeFormatter(DateTimeUtils.Formatter
dateTimeFormatter) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ return this;
+ }
+
+ public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {
+ this.timeFormatter = timeFormatter;
+ return this;
+ }
+
+ public TextDeserializationSchema build() {
+ return new TextDeserializationSchema(
+ seaTunnelRowType, separators, dateFormatter,
dateTimeFormatter, timeFormatter);
+ }
+ }
@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
String content = new String(message);
- Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content,
seaTunnelRowType);
+ Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content,
seaTunnelRowType, 0);
Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < objects.length; i++) {
- objects[i] = convert(splitsMap.get(i),
seaTunnelRowType.getFieldType(i));
+ objects[i] = convert(splitsMap.get(i),
seaTunnelRowType.getFieldType(i), 0);
}
return new SeaTunnelRow(objects);
}
@@ -74,50 +128,34 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
}
private Map<Integer, String> splitLineBySeaTunnelRowType(
- String line, SeaTunnelRowType seaTunnelRowType) {
- String[] splits = line.split(delimiter, -1);
+ String line, SeaTunnelRowType seaTunnelRowType, int level) {
+ String[] splits = line.split(separators[level], -1);
LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
- int cursor = 0;
- for (int i = 0; i < fieldTypes.length; i++) {
- if (fieldTypes[i].getSqlType() == SqlType.ROW) {
- // row type
- int totalFields = ((SeaTunnelRowType)
fieldTypes[i]).getTotalFields();
- // if current field is empty
- if (cursor >= splits.length) {
- splitsMap.put(i, null);
- } else {
- ArrayList<String> rowSplits =
- new ArrayList<>(
- Arrays.asList(splits).subList(cursor,
cursor + totalFields));
- splitsMap.put(i, String.join(delimiter, rowSplits));
- }
- cursor += totalFields;
- } else {
- // not row type
- // if current field is empty
- if (cursor >= splits.length) {
- splitsMap.put(i, null);
- cursor++;
- } else {
- splitsMap.put(i, splits[cursor++]);
- }
+ for (int i = 0; i < splits.length; i++) {
+ splitsMap.put(i, splits[i]);
+ }
+ if (fieldTypes.length > splits.length) {
+ // contains partition columns
+ for (int i = splits.length; i < fieldTypes.length; i++) {
+ splitsMap.put(i, null);
}
}
return splitsMap;
}
- private Object convert(String field, SeaTunnelDataType<?> fieldType) {
+ private Object convert(String field, SeaTunnelDataType<?> fieldType, int
level) {
if (StringUtils.isBlank(field)) {
return null;
}
switch (fieldType.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>)
fieldType).getElementType();
- ArrayNode jsonNodes = JsonUtils.parseArray(field);
+ String[] elements = field.split(separators[level + 1]);
ArrayList<Object> objectArrayList = new ArrayList<>();
- jsonNodes.forEach(
- jsonNode ->
objectArrayList.add(convert(jsonNode.toString(), elementType)));
+ for (String element : elements) {
+ objectArrayList.add(convert(element, elementType, level +
1));
+ }
switch (elementType.getSqlType()) {
case STRING:
return objectArrayList.toArray(new String[0]);
@@ -146,10 +184,13 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
SeaTunnelDataType<?> keyType = ((MapType<?, ?>)
fieldType).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
fieldType).getValueType();
LinkedHashMap<Object, Object> objectMap = new
LinkedHashMap<>();
- Map<String, String> fieldsMap = JsonUtils.toMap(field);
- fieldsMap.forEach(
- (key, value) ->
- objectMap.put(convert(key, keyType),
convert(value, valueType)));
+ String[] kvs = field.split(separators[level + 1]);
+ for (String kv : kvs) {
+ String[] splits = kv.split(separators[level + 2]);
+ objectMap.put(
+ convert(splits[0], keyType, level + 1),
+ convert(splits[1], valueType, level + 1));
+ }
return objectMap;
case STRING:
return field;
@@ -181,13 +222,14 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
return DateTimeUtils.parse(field, dateTimeFormatter);
case ROW:
Map<Integer, String> splitsMap =
- splitLineBySeaTunnelRowType(field, (SeaTunnelRowType)
fieldType);
+ splitLineBySeaTunnelRowType(field, (SeaTunnelRowType)
fieldType, level + 1);
Object[] objects = new Object[splitsMap.size()];
for (int i = 0; i < objects.length; i++) {
objects[i] =
convert(
splitsMap.get(i),
- ((SeaTunnelRowType)
fieldType).getFieldType(i));
+ ((SeaTunnelRowType)
fieldType).getFieldType(i),
+ level + 1);
}
return new SeaTunnelRow(objects);
default:
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 1c7293adf..59a65e1e2 100644
---
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -18,14 +18,17 @@
package org.apache.seatunnel.format.text;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
import lombok.Builder;
@@ -34,17 +37,79 @@ import lombok.NonNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
-@Builder
public class TextSerializationSchema implements SerializationSchema {
- @NonNull private SeaTunnelRowType seaTunnelRowType;
- @NonNull private String delimiter;
- @Builder.Default private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final String[] separators;
+ private final DateUtils.Formatter dateFormatter;
+ private final DateTimeUtils.Formatter dateTimeFormatter;
+ private final TimeUtils.Formatter timeFormatter;
- @Builder.Default
- private DateTimeUtils.Formatter dateTimeFormatter =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ private TextSerializationSchema(
+ @NonNull SeaTunnelRowType seaTunnelRowType,
+ String[] separators,
+ DateUtils.Formatter dateFormatter,
+ DateTimeUtils.Formatter dateTimeFormatter,
+ TimeUtils.Formatter timeFormatter) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.separators = separators;
+ this.dateFormatter = dateFormatter;
+ this.dateTimeFormatter = dateTimeFormatter;
+ this.timeFormatter = timeFormatter;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private SeaTunnelRowType seaTunnelRowType;
+ private String[] separators = TextFormatConstant.SEPARATOR.clone();
+ private DateUtils.Formatter dateFormatter =
DateUtils.Formatter.YYYY_MM_DD;
+ private DateTimeUtils.Formatter dateTimeFormatter =
+ DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
+
+ private Builder() {}
- @Builder.Default private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
+ public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ return this;
+ }
+
+ public Builder delimiter(String delimiter) {
+ this.separators[0] = delimiter;
+ return this;
+ }
+
+ public Builder separators(String[] separators) {
+ this.separators = separators;
+ return this;
+ }
+
+ public Builder dateFormatter(DateUtils.Formatter dateFormatter) {
+ this.dateFormatter = dateFormatter;
+ return this;
+ }
+
+ public Builder dateTimeFormatter(DateTimeUtils.Formatter
dateTimeFormatter) {
+ this.dateTimeFormatter = dateTimeFormatter;
+ return this;
+ }
+
+ public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {
+ this.timeFormatter = timeFormatter;
+ return this;
+ }
+
+ public TextSerializationSchema build() {
+ return new TextSerializationSchema(
+ seaTunnelRowType, separators, dateFormatter,
dateTimeFormatter, timeFormatter);
+ }
+ }
@Override
public byte[] serialize(SeaTunnelRow element) {
@@ -55,12 +120,12 @@ public class TextSerializationSchema implements
SerializationSchema {
Object[] fields = element.getFields();
String[] strings = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
- strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i));
+ strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i),
0);
}
- return String.join(delimiter, strings).getBytes();
+ return String.join(separators[0], strings).getBytes();
}
- private String convert(Object field, SeaTunnelDataType<?> fieldType) {
+ private String convert(Object field, SeaTunnelDataType<?> fieldType, int
level) {
if (field == null) {
return "";
}
@@ -86,15 +151,36 @@ public class TextSerializationSchema implements
SerializationSchema {
case BYTES:
return new String((byte[]) field);
case ARRAY:
+ BasicType<?> elementType = ((ArrayType<?, ?>)
fieldType).getElementType();
+ return Arrays.stream((Object[]) field)
+ .map(f -> convert(f, elementType, level + 1))
+ .collect(Collectors.joining(separators[level + 1]));
case MAP:
- return JsonUtils.toJsonString(field);
+ SeaTunnelDataType<?> keyType = ((MapType<?, ?>)
fieldType).getKeyType();
+ SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
fieldType).getValueType();
+ return ((Map<Object, Object>) field)
+ .entrySet().stream()
+ .map(
+ entry ->
+ String.join(
+ separators[level + 2],
+
convert(entry.getKey(), keyType, level + 1),
+ convert(
+
entry.getValue(),
+ valueType,
+ level + 1)))
+ .collect(Collectors.joining(separators[level +
1]));
case ROW:
Object[] fields = ((SeaTunnelRow) field).getFields();
String[] strings = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
- strings[i] = convert(fields[i], ((SeaTunnelRowType)
fieldType).getFieldType(i));
+ strings[i] =
+ convert(
+ fields[i],
+ ((SeaTunnelRowType)
fieldType).getFieldType(i),
+ level + 1);
}
- return String.join(delimiter, strings);
+ return String.join(separators[level + 1], strings);
default:
throw new SeaTunnelTextFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
similarity index 51%
copy from
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
copy to
seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
index e966ebcd5..76e115d3e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
@@ -15,30 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.exception;
+package org.apache.seatunnel.format.text.constant;
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+public class TextFormatConstant {
-public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
- FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
- DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
- FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+ public static final String[] SEPARATOR =
+ new String[] {"\u0001", "\u0002", "\u0003", "\u0004", "\u0005",
"\u0006", "\u0007"};
- private final String code;
- private final String description;
+ public static final String PLACEHOLDER = "\u0008";
- FileConnectorErrorCode(String code, String description) {
- this.code = code;
- this.description = description;
- }
-
- @Override
- public String getCode() {
- return code;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
+ private TextFormatConstant() {}
}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
index 1d272797c..7d904e2c8 100644
---
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
@@ -32,26 +32,37 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
public class TextFormatSchemaTest {
public String content =
- "[1,2,3,4,5,6]#"
- + "{\"tyrantlucifer\":18,\"Kris\":21}#"
- + "tyrantlucifer#"
- + "true#"
- + "1#"
- + "2#"
- + "3#"
- + "4#"
- + "6.66#"
- + "7.77#"
- + "8.8888888#"
- + "#"
- + "tyrantlucifer#"
- + "2022-09-24#"
- + "22:45:00#"
- + "2022-09-24 22:45:00";
+ String.join("\u0002", Arrays.asList("1", "2", "3", "4", "5", "6"))
+ + '\001'
+ + "tyrantlucifer"
+ + '\003'
+ + "18"
+ + '\002'
+ + "Kris"
+ + '\003'
+ + "21\001"
+ + "tyrantlucifer\001"
+ + "true\001"
+ + "1\001"
+ + "2\001"
+ + "3\001"
+ + "4\001"
+ + "6.66\001"
+ + "7.77\001"
+ + "8.8888888\001"
+ + '\001'
+ + "tyrantlucifer\001"
+ + "2022-09-24\001"
+ + "22:45:00\001"
+ + "2022-09-24 22:45:00\001"
+ + String.join("\u0003", Arrays.asList("1", "2", "3", "4",
"5", "6"))
+ + '\002'
+ + "tyrantlucifer\00418\003Kris\00421";
public SeaTunnelRowType seaTunnelRowType;
@@ -76,7 +87,8 @@ public class TextFormatSchemaTest {
"bytes_field",
"date_field",
"time_field",
- "timestamp_field"
+ "timestamp_field",
+ "row_field"
},
new SeaTunnelDataType<?>[] {
ArrayType.INT_ARRAY_TYPE,
@@ -94,7 +106,15 @@ public class TextFormatSchemaTest {
PrimitiveByteArrayType.INSTANCE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ new SeaTunnelRowType(
+ new String[] {
+ "array_field", "map_field",
+ },
+ new SeaTunnelDataType<?>[] {
+ ArrayType.INT_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE),
+ })
});
}
@@ -103,12 +123,12 @@ public class TextFormatSchemaTest {
TextDeserializationSchema deserializationSchema =
TextDeserializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
- .delimiter("#")
+ .delimiter("\u0001")
.build();
TextSerializationSchema serializationSchema =
TextSerializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
- .delimiter("#")
+ .delimiter("\u0001")
.build();
SeaTunnelRow seaTunnelRow =
deserializationSchema.deserialize(content.getBytes());
String data = new String(serializationSchema.serialize(seaTunnelRow));