This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a3c13e59eb [Feature][Connector V2][File] Add config of
'file_filter_pattern', which used for filtering files. (#5153)
a3c13e59eb is described below
commit a3c13e59eb30d603f326442bd784a0eb85f350a6
Author: FlechazoW <[email protected]>
AuthorDate: Mon Jul 31 17:33:36 2023 +0800
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which
used for filtering files. (#5153)
* [Feature][Connector V2][File] Add config of 'file_filter_pattern', which
used for filtering files.
---
docs/en/connector-v2/source/CosFile.md | 5 +
docs/en/connector-v2/source/FtpFile.md | 1 +
docs/en/connector-v2/source/HdfsFile.md | 5 +
docs/en/connector-v2/source/LocalFile.md | 5 +
docs/en/connector-v2/source/OssFile.md | 5 +
docs/en/connector-v2/source/OssJindoFile.md | 5 +
docs/en/connector-v2/source/S3File.md | 5 +
docs/en/connector-v2/source/SftpFile.md | 5 +
.../file/hdfs/source/BaseHdfsFileSource.java | 7 +-
.../seatunnel/file/config/BaseSourceConfig.java | 7 +
.../file/source/reader/AbstractReadStrategy.java | 30 ++++-
.../file/cos/source/CosFileSourceFactory.java | 1 +
.../file/ftp/source/FtpFileSourceFactory.java | 1 +
.../file/hdfs/source/HdfsFileSourceFactory.java | 1 +
.../file/oss/source/OssFileSourceFactory.java | 1 +
.../file/local/source/LocalFileSourceFactory.java | 1 +
.../file/oss/source/OssFileSourceFactory.java | 1 +
.../file/s3/source/S3FileSourceFactory.java | 1 +
.../file/sftp/source/SftpFileSourceFactory.java | 1 +
.../e2e/connector/file/ftp/FtpFileIT.java | 86 +++++--------
.../excel/ftp_filter_excel_to_assert.conf | 141 +++++++++++++++++++++
.../e2e/connector/file/local/LocalFileIT.java | 119 +++++++----------
.../excel/local_filter_excel_to_assert.conf | 131 +++++++++++++++++++
.../e2e/connector/file/fstp/SftpFileIT.java | 78 +++++-------
.../excel/sftp_filter_excel_to_assert.conf | 132 +++++++++++++++++++
.../seatunnel/e2e/common/container/TestHelper.java | 40 ++++++
.../seatunnel/e2e/common/util/ContainerUtil.java | 6 +
27 files changed, 649 insertions(+), 172 deletions(-)
diff --git a/docs/en/connector-v2/source/CosFile.md
b/docs/en/connector-v2/source/CosFile.md
index 18fc0299c9..dd1e77ebcf 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### path [string]
@@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
Reader the sheet of the workbook,Only used when file_format is excel.
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Example
```hocon
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index b550bde8ba..6737511e63 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### host [string]
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index d255f4fd3a..1d285c539a 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### path [string]
@@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
Reader the sheet of the workbook,Only used when file_format is excel.
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Example
```hocon
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index d33288b7a5..7d80a4d2b7 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### path [string]
@@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
Reader the sheet of the workbook,Only used when file_format is excel.
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Example
```hocon
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index 532b4d03aa..12f2141cd6 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### path [string]
@@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when
file_format is excel.
```
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index 3e3649e19b..913d277683 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### path [string]
@@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
Reader the sheet of the workbook,Only used when file_format is excel.
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Example
```hocon
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index f58a1a6bc3..79a89be1c2 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| schema | config | no | -
|
| common-options | | no | -
|
| sheet_name | string | no | -
|
+| file_filter_pattern | string | no | -
|
### path [string]
@@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when
file_format is excel.
```
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Changelog
### 2.3.0-beta 2022-10-20
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index 500ec2af5b..22047d481e 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
+| file_filter_pattern | string | no | - |
### host [string]
@@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
Reader the sheet of the workbook,Only used when file_format is excel.
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
## Example
```hocon
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9864fc3750..57d2ceca6e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -79,12 +79,7 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
- if (filePaths.isEmpty()) {
- throw new FileConnectorException(
- FileConnectorErrorCode.FILE_LIST_EMPTY,
- "The target file list is empty,"
- + "SeaTunnel will not be able to sync empty
table");
- }
+
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index fa65628bd5..7b1e32d1da 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -112,4 +112,11 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("To be read sheet name,only valid for
excel files");
+
+ public static final Option<String> FILE_FILTER_PATTERN =
+ Options.key("file_filter_pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "File pattern. The connector will filter some
files base on the pattern.");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index ea6c902c05..e4e1694f30 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -24,6 +24,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +45,9 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
@@ -74,6 +79,8 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected long skipHeaderNumber =
BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected transient boolean isKerberosAuthorization = false;
+ protected Pattern pattern;
+
@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
@@ -126,7 +133,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
fileNames.addAll(getFileNamesByPath(hadoopConf,
fileStatus.getPath().toString()));
continue;
}
- if (fileStatus.isFile()) {
+ if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
@@ -146,6 +153,15 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
}
}
+
+ if (fileNames.isEmpty()) {
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_EMPTY,
+ "The target file list is empty,"
+ + "SeaTunnel will not be able to sync empty table,
"
+ + "please check the configuration parameters such
as: [file_filter_pattern]");
+ }
+
return fileNames;
}
@@ -166,6 +182,11 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
}
+ if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
+ String filterPattern =
+
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
+ this.pattern =
Pattern.compile(Matcher.quoteReplacement(filterPattern));
+ }
}
@Override
@@ -214,4 +235,11 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
// return merge row type
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}
+
+ protected boolean filterFileByPattern(FileStatus fileStatus) {
+ if (Objects.nonNull(pattern)) {
+ return pattern.matcher(fileStatus.getPath().getName()).matches();
+ }
+ return true;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index d0b781f1a1..496e9277f4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -60,6 +60,7 @@ public class CosFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index d2d11da5b4..4ab637c434 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -60,6 +60,7 @@ public class FtpFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index d4c1738490..c3d406d62c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -57,6 +57,7 @@ public class HdfsFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 5025676762..eaea7bccb6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -60,6 +60,7 @@ public class OssFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 4ae2ae3a9b..03ec8660ce 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -56,6 +56,7 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index c6a2d70409..e7d862bd44 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -60,6 +60,7 @@ public class OssFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 71156a21b6..a3b4808865 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -65,6 +65,7 @@ public class S3FileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index 18cda2fbe5..e9efe1cdf9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -60,6 +60,7 @@ public class SftpFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
+ .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 5fc0e48609..15a58ebf08 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -21,23 +21,20 @@ import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.Collections;
import java.util.stream.Stream;
@@ -87,19 +84,26 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
Startables.deepStart(Stream.of(ftpContainer)).join();
log.info("ftp container started");
- Path jsonPath =
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
- Path textPath =
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
- Path excelPath =
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
-
- ftpContainer.copyFileToContainer(
- MountableFile.forHostPath(jsonPath),
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
- ftpContainer.copyFileToContainer(
- MountableFile.forHostPath(textPath),
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
- ftpContainer.copyFileToContainer(
- MountableFile.forHostPath(excelPath),
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ ftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ ftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ ftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+ ftpContainer);
+
ftpContainer.execInContainer("sh", "-c", "chmod -R 777
/home/vsftpd/seatunnel/");
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
}
@@ -107,51 +111,31 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testFtpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
// test write ftp excel file
- Container.ExecResult excelWriteResult =
- container.executeJob("/excel/fake_source_to_ftp_excel.conf");
- Assertions.assertEquals(0, excelWriteResult.getExitCode(),
excelWriteResult.getStderr());
+ helper.execute("/excel/fake_source_to_ftp_excel.conf");
// test read ftp excel file
- Container.ExecResult excelReadResult =
- container.executeJob("/excel/ftp_excel_to_assert.conf");
- Assertions.assertEquals(0, excelReadResult.getExitCode(),
excelReadResult.getStderr());
+ helper.execute("/excel/ftp_excel_to_assert.conf");
// test read ftp excel file with projection
- Container.ExecResult excelProjectionReadResult =
-
container.executeJob("/excel/ftp_excel_projection_to_assert.conf");
- Assertions.assertEquals(
- 0, excelProjectionReadResult.getExitCode(),
excelProjectionReadResult.getStderr());
+ helper.execute("/excel/ftp_excel_projection_to_assert.conf");
+ // test read ftp excel file with filter
+ helper.execute("/excel/ftp_filter_excel_to_assert.conf");
// test write ftp text file
- Container.ExecResult textWriteResult =
- container.executeJob("/text/fake_to_ftp_file_text.conf");
- Assertions.assertEquals(0, textWriteResult.getExitCode());
+ helper.execute("/text/fake_to_ftp_file_text.conf");
// test read skip header
- Container.ExecResult textWriteAndSkipResult =
- container.executeJob("/text/ftp_file_text_skip_headers.conf");
- Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+ helper.execute("/text/ftp_file_text_skip_headers.conf");
// test read ftp text file
- Container.ExecResult textReadResult =
- container.executeJob("/text/ftp_file_text_to_assert.conf");
- Assertions.assertEquals(0, textReadResult.getExitCode());
+ helper.execute("/text/ftp_file_text_to_assert.conf");
// test read ftp text file with projection
- Container.ExecResult textProjectionResult =
-
container.executeJob("/text/ftp_file_text_projection_to_assert.conf");
- Assertions.assertEquals(0, textProjectionResult.getExitCode());
+ helper.execute("/text/ftp_file_text_projection_to_assert.conf");
// test write ftp json file
- Container.ExecResult jsonWriteResult =
- container.executeJob("/json/fake_to_ftp_file_json.conf");
- Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+ helper.execute("/json/fake_to_ftp_file_json.conf");
// test read ftp json file
- Container.ExecResult jsonReadResult =
- container.executeJob("/json/ftp_file_json_to_assert.conf");
- Assertions.assertEquals(0, jsonReadResult.getExitCode());
+ helper.execute("/json/ftp_file_json_to_assert.conf");
// test write ftp parquet file
- Container.ExecResult parquetWriteResult =
- container.executeJob("/parquet/fake_to_ftp_file_parquet.conf");
- Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+ helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
// test write ftp orc file
- Container.ExecResult orcWriteResult =
- container.executeJob("/orc/fake_to_ftp_file_orc.conf");
- Assertions.assertEquals(0, orcWriteResult.getExitCode());
+ helper.execute("/orc/fake_to_ftp_file_orc.conf");
}
@AfterAll
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..6af42f6f3d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel/read/excel_filter"
+ result_table_name = "ftp"
+ file_format_type = excel
+ delimiter = ;
+ skip_header_row_number = 1
+ file_filter_pattern = "e2e_filter.*"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ source_table_name = "ftp"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index f5c220deab..aed3576726 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -21,17 +21,14 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.utility.MountableFile;
import java.io.IOException;
-import java.nio.file.Path;
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
@@ -43,88 +40,70 @@ public class LocalFileIT extends TestSuiteBase {
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
- Path jsonPath =
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
- Path orcPath =
ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath();
- Path parquetPath =
ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath();
- Path textPath =
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
- Path excelPath =
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
- container.copyFileToContainer(
- MountableFile.forHostPath(jsonPath),
-
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
- container.copyFileToContainer(
- MountableFile.forHostPath(orcPath),
-
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc");
- container.copyFileToContainer(
- MountableFile.forHostPath(parquetPath),
-
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet");
- container.copyFileToContainer(
- MountableFile.forHostPath(textPath),
-
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
- container.copyFileToContainer(
- MountableFile.forHostPath(excelPath),
-
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/orc/e2e.orc",
+
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/parquet/e2e.parquet",
+
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+ container);
};
@TestTemplate
public void testLocalFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
- Container.ExecResult excelWriteResult =
- container.executeJob("/excel/fake_to_local_excel.conf");
- Assertions.assertEquals(0, excelWriteResult.getExitCode(),
excelWriteResult.getStderr());
- Container.ExecResult excelReadResult =
- container.executeJob("/excel/local_excel_to_assert.conf");
- Assertions.assertEquals(0, excelReadResult.getExitCode(),
excelReadResult.getStderr());
- Container.ExecResult excelProjectionReadResult =
-
container.executeJob("/excel/local_excel_projection_to_assert.conf");
- Assertions.assertEquals(
- 0, excelProjectionReadResult.getExitCode(),
excelProjectionReadResult.getStderr());
+ TestHelper helper = new TestHelper(container);
+
+ helper.execute("/excel/fake_to_local_excel.conf");
+ helper.execute("/excel/local_excel_to_assert.conf");
+ helper.execute("/excel/local_excel_projection_to_assert.conf");
// test write local text file
- Container.ExecResult textWriteResult =
- container.executeJob("/text/fake_to_local_file_text.conf");
- Assertions.assertEquals(0, textWriteResult.getExitCode());
+ helper.execute("/text/fake_to_local_file_text.conf");
// test read skip header
- Container.ExecResult textWriteAndSkipResult =
-
container.executeJob("/text/local_file_text_skip_headers.conf");
- Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+ helper.execute("/text/local_file_text_skip_headers.conf");
// test read local text file
- Container.ExecResult textReadResult =
- container.executeJob("/text/local_file_text_to_assert.conf");
- Assertions.assertEquals(0, textReadResult.getExitCode());
+ helper.execute("/text/local_file_text_to_assert.conf");
// test read local text file with projection
- Container.ExecResult textProjectionResult =
-
container.executeJob("/text/local_file_text_projection_to_assert.conf");
- Assertions.assertEquals(0, textProjectionResult.getExitCode());
+ helper.execute("/text/local_file_text_projection_to_assert.conf");
// test write local json file
- Container.ExecResult jsonWriteResult =
- container.executeJob("/json/fake_to_local_file_json.conf");
- Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+ helper.execute("/json/fake_to_local_file_json.conf");
// test read local json file
- Container.ExecResult jsonReadResult =
- container.executeJob("/json/local_file_json_to_assert.conf");
- Assertions.assertEquals(0, jsonReadResult.getExitCode());
+ helper.execute("/json/local_file_json_to_assert.conf");
// test write local orc file
- Container.ExecResult orcWriteResult =
- container.executeJob("/orc/fake_to_local_file_orc.conf");
- Assertions.assertEquals(0, orcWriteResult.getExitCode());
+ helper.execute("/orc/fake_to_local_file_orc.conf");
// test read local orc file
- Container.ExecResult orcReadResult =
- container.executeJob("/orc/local_file_orc_to_assert.conf");
- Assertions.assertEquals(0, orcReadResult.getExitCode());
+ helper.execute("/orc/local_file_orc_to_assert.conf");
// test read local orc file with projection
- Container.ExecResult orcProjectionResult =
-
container.executeJob("/orc/local_file_orc_projection_to_assert.conf");
- Assertions.assertEquals(0, orcProjectionResult.getExitCode());
+ helper.execute("/orc/local_file_orc_projection_to_assert.conf");
// test write local parquet file
- Container.ExecResult parquetWriteResult =
-
container.executeJob("/parquet/fake_to_local_file_parquet.conf");
- Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+ helper.execute("/parquet/fake_to_local_file_parquet.conf");
// test read local parquet file
- Container.ExecResult parquetReadResult =
-
container.executeJob("/parquet/local_file_parquet_to_assert.conf");
- Assertions.assertEquals(0, parquetReadResult.getExitCode());
+ helper.execute("/parquet/local_file_parquet_to_assert.conf");
// test read local parquet file with projection
- Container.ExecResult parquetProjectionResult =
-
container.executeJob("/parquet/local_file_parquet_projection_to_assert.conf");
- Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
+
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
+ // test read filtered local file
+ helper.execute("/excel/local_filter_excel_to_assert.conf");
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..86039b44db
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/excel_filter"
+ result_table_name = "fake"
+ file_format_type = excel
+ delimiter = ;
+ skip_header_row_number = 1
+ file_filter_pattern = "e2e_filter.*"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index 82d1be73db..e5fbcb5f5e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -21,22 +21,19 @@ import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.Collections;
import java.util.stream.Stream;
@@ -75,61 +72,54 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
sftpContainer.start();
Startables.deepStart(Stream.of(sftpContainer)).join();
log.info("Sftp container started");
- Path jsonPath =
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
- Path textPath =
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
- Path excelPath =
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
- sftpContainer.copyFileToContainer(
- MountableFile.forHostPath(jsonPath),
-
"/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
- sftpContainer.copyFileToContainer(
- MountableFile.forHostPath(textPath),
-
"/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
- sftpContainer.copyFileToContainer(
- MountableFile.forHostPath(excelPath),
-
"/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ sftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ sftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ sftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/home/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+ sftpContainer);
+
sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel
/home/seatunnel/tmp/");
}
@TestTemplate
public void testSftpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
// test write sftp excel file
- Container.ExecResult excelWriteResult =
- container.executeJob("/excel/fakesource_to_sftp_excel.conf");
- Assertions.assertEquals(0, excelWriteResult.getExitCode(),
excelWriteResult.getStderr());
+ helper.execute("/excel/fakesource_to_sftp_excel.conf");
// test read sftp excel file
- Container.ExecResult excelReadResult =
- container.executeJob("/excel/sftp_excel_to_assert.conf");
- Assertions.assertEquals(0, excelReadResult.getExitCode(),
excelReadResult.getStderr());
+ helper.execute("/excel/sftp_excel_to_assert.conf");
// test read sftp excel file with projection
- Container.ExecResult excelProjectionReadResult =
-
container.executeJob("/excel/sftp_excel_projection_to_assert.conf");
- Assertions.assertEquals(
- 0, excelProjectionReadResult.getExitCode(),
excelProjectionReadResult.getStderr());
+ helper.execute("/excel/sftp_excel_projection_to_assert.conf");
+ // test read sftp excel file with filter pattern
+ helper.execute("/excel/sftp_filter_excel_to_assert.conf");
// test write sftp text file
- Container.ExecResult textWriteResult =
- container.executeJob("/text/fake_to_sftp_file_text.conf");
- Assertions.assertEquals(0, textWriteResult.getExitCode());
+ helper.execute("/text/fake_to_sftp_file_text.conf");
// test read skip header
- Container.ExecResult textWriteAndSkipResult =
- container.executeJob("/text/sftp_file_text_skip_headers.conf");
- Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+ helper.execute("/text/sftp_file_text_skip_headers.conf");
// test read sftp text file
- Container.ExecResult textReadResult =
- container.executeJob("/text/sftp_file_text_to_assert.conf");
- Assertions.assertEquals(0, textReadResult.getExitCode());
+ helper.execute("/text/sftp_file_text_to_assert.conf");
// test read sftp text file with projection
- Container.ExecResult textProjectionResult =
-
container.executeJob("/text/sftp_file_text_projection_to_assert.conf");
- Assertions.assertEquals(0, textProjectionResult.getExitCode());
+ helper.execute("/text/sftp_file_text_projection_to_assert.conf");
// test write sftp json file
- Container.ExecResult jsonWriteResult =
- container.executeJob("/json/fake_to_sftp_file_json.conf");
- Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+ helper.execute("/json/fake_to_sftp_file_json.conf");
// test read sftp json file
- Container.ExecResult jsonReadResult =
- container.executeJob("/json/sftp_file_json_to_assert.conf");
- Assertions.assertEquals(0, jsonReadResult.getExitCode());
+ helper.execute("/json/sftp_file_json_to_assert.conf");
}
@AfterAll
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..b6cd92f712
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ SftpFile {
+ path = "tmp/seatunnel/read/excel_filter"
+ result_table_name = "sftp"
+ file_format_type = excel
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+ delimiter = ";"
+ file_filter_pattern = "e2e_filter.*"
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "sftp"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
new file mode 100644
index 0000000000..a88723f820
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestHelper {
+ private final TestContainer container;
+
+ public TestHelper(TestContainer container) {
+ this.container = container;
+ }
+
+ public void execute(String file) throws IOException, InterruptedException {
+ execute(0, file);
+ }
+
+ public void execute(int exceptResult, String file) throws IOException,
InterruptedException {
+ Container.ExecResult result = container.executeJob(file);
+ Assertions.assertEquals(exceptResult, result.getExitCode(),
result.getStderr());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 92d6100a7c..fa5660a170 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -246,4 +246,10 @@ public final class ContainerUtil {
throw new FactoryException("Could not load service provider for
containers.", e);
}
}
+
+ public static void copyFileIntoContainers(
+ String fileName, String targetPath, GenericContainer<?> container)
{
+ Path path = getResourcesFile(fileName).toPath();
+ container.copyFileToContainer(MountableFile.forHostPath(path),
targetPath);
+ }
}