This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a3c13e59eb [Feature][Connector V2][File] Add config of 
'file_filter_pattern', which used for filtering files. (#5153)
a3c13e59eb is described below

commit a3c13e59eb30d603f326442bd784a0eb85f350a6
Author: FlechazoW <[email protected]>
AuthorDate: Mon Jul 31 17:33:36 2023 +0800

    [Feature][Connector V2][File] Add config of 'file_filter_pattern', which 
used for filtering files. (#5153)
    
    * [Feature][Connector V2][File] Add config of 'file_filter_pattern', which 
used for filtering files.
---
 docs/en/connector-v2/source/CosFile.md             |   5 +
 docs/en/connector-v2/source/FtpFile.md             |   1 +
 docs/en/connector-v2/source/HdfsFile.md            |   5 +
 docs/en/connector-v2/source/LocalFile.md           |   5 +
 docs/en/connector-v2/source/OssFile.md             |   5 +
 docs/en/connector-v2/source/OssJindoFile.md        |   5 +
 docs/en/connector-v2/source/S3File.md              |   5 +
 docs/en/connector-v2/source/SftpFile.md            |   5 +
 .../file/hdfs/source/BaseHdfsFileSource.java       |   7 +-
 .../seatunnel/file/config/BaseSourceConfig.java    |   7 +
 .../file/source/reader/AbstractReadStrategy.java   |  30 ++++-
 .../file/cos/source/CosFileSourceFactory.java      |   1 +
 .../file/ftp/source/FtpFileSourceFactory.java      |   1 +
 .../file/hdfs/source/HdfsFileSourceFactory.java    |   1 +
 .../file/oss/source/OssFileSourceFactory.java      |   1 +
 .../file/local/source/LocalFileSourceFactory.java  |   1 +
 .../file/oss/source/OssFileSourceFactory.java      |   1 +
 .../file/s3/source/S3FileSourceFactory.java        |   1 +
 .../file/sftp/source/SftpFileSourceFactory.java    |   1 +
 .../e2e/connector/file/ftp/FtpFileIT.java          |  86 +++++--------
 .../excel/ftp_filter_excel_to_assert.conf          | 141 +++++++++++++++++++++
 .../e2e/connector/file/local/LocalFileIT.java      | 119 +++++++----------
 .../excel/local_filter_excel_to_assert.conf        | 131 +++++++++++++++++++
 .../e2e/connector/file/fstp/SftpFileIT.java        |  78 +++++-------
 .../excel/sftp_filter_excel_to_assert.conf         | 132 +++++++++++++++++++
 .../seatunnel/e2e/common/container/TestHelper.java |  40 ++++++
 .../seatunnel/e2e/common/util/ContainerUtil.java   |   6 +
 27 files changed, 649 insertions(+), 172 deletions(-)

diff --git a/docs/en/connector-v2/source/CosFile.md 
b/docs/en/connector-v2/source/CosFile.md
index 18fc0299c9..dd1e77ebcf 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### path [string]
 
@@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 Reader the sheet of the workbook,Only used when file_format is excel.
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Example
 
 ```hocon
diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index b550bde8ba..6737511e63 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### host [string]
 
diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index d255f4fd3a..1d285c539a 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### path [string]
 
@@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 Reader the sheet of the workbook,Only used when file_format is excel.
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Example
 
 ```hocon
diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index d33288b7a5..7d80a4d2b7 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### path [string]
 
@@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 Reader the sheet of the workbook,Only used when file_format is excel.
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Example
 
 ```hocon
diff --git a/docs/en/connector-v2/source/OssFile.md 
b/docs/en/connector-v2/source/OssFile.md
index 532b4d03aa..12f2141cd6 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### path [string]
 
@@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when 
file_format is excel.
 
 ```
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/OssJindoFile.md 
b/docs/en/connector-v2/source/OssJindoFile.md
index 3e3649e19b..913d277683 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### path [string]
 
@@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 Reader the sheet of the workbook,Only used when file_format is excel.
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Example
 
 ```hocon
diff --git a/docs/en/connector-v2/source/S3File.md 
b/docs/en/connector-v2/source/S3File.md
index f58a1a6bc3..79a89be1c2 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | schema                          | config  | no       | -                     
                                |
 | common-options                  |         | no       | -                     
                                |
 | sheet_name                      | string  | no       | -                     
                                |
+| file_filter_pattern             | string  | no       | -                     
                                |
 
 ### path [string]
 
@@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when 
file_format is excel.
 
 ```
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Changelog
 
 ### 2.3.0-beta 2022-10-20
diff --git a/docs/en/connector-v2/source/SftpFile.md 
b/docs/en/connector-v2/source/SftpFile.md
index 500ec2af5b..22047d481e 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
 | sheet_name                | string  | no       | -                   |
+| file_filter_pattern       | string  | no       | -                   |
 
 ### host [string]
 
@@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 Reader the sheet of the workbook,Only used when file_format is excel.
 
+### file_filter_pattern [string]
+
+Filter pattern, which used for filtering files.
+
 ## Example
 
 ```hocon
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9864fc3750..57d2ceca6e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -79,12 +79,7 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
             throw new FileConnectorException(
                     FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
-        if (filePaths.isEmpty()) {
-            throw new FileConnectorException(
-                    FileConnectorErrorCode.FILE_LIST_EMPTY,
-                    "The target file list is empty,"
-                            + "SeaTunnel will not be able to sync empty 
table");
-        }
+
         // support user-defined schema
         FileFormat fileFormat =
                 FileFormat.valueOf(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index fa65628bd5..7b1e32d1da 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -112,4 +112,11 @@ public class BaseSourceConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("To be read sheet name,only valid for 
excel files");
+
+    public static final Option<String> FILE_FILTER_PATTERN =
+            Options.key("file_filter_pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "File pattern. The connector will filter some 
files base on the pattern.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index ea6c902c05..e4e1694f30 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -24,6 +24,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +45,9 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
 import static 
org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
@@ -74,6 +79,8 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     protected long skipHeaderNumber = 
BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
     protected transient boolean isKerberosAuthorization = false;
 
+    protected Pattern pattern;
+
     @Override
     public void init(HadoopConf conf) {
         this.hadoopConf = conf;
@@ -126,7 +133,7 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
                 fileNames.addAll(getFileNamesByPath(hadoopConf, 
fileStatus.getPath().toString()));
                 continue;
             }
-            if (fileStatus.isFile()) {
+            if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
                 // filter '_SUCCESS' file
                 if (!fileStatus.getPath().getName().equals("_SUCCESS")
                         && !fileStatus.getPath().getName().startsWith(".")) {
@@ -146,6 +153,15 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
                 }
             }
         }
+
+        if (fileNames.isEmpty()) {
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FILE_LIST_EMPTY,
+                    "The target file list is empty,"
+                            + "SeaTunnel will not be able to sync empty table, 
"
+                            + "please check the configuration parameters such 
as: [file_filter_pattern]");
+        }
+
         return fileNames;
     }
 
@@ -166,6 +182,11 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
             
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
         }
+        if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
+            String filterPattern =
+                    
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
+            this.pattern = 
Pattern.compile(Matcher.quoteReplacement(filterPattern));
+        }
     }
 
     @Override
@@ -214,4 +235,11 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         // return merge row type
         return new SeaTunnelRowType(newFieldNames, newFieldTypes);
     }
+
+    protected boolean filterFileByPattern(FileStatus fileStatus) {
+        if (Objects.nonNull(pattern)) {
+            return pattern.matcher(fileStatus.getPath().getName()).matches();
+        }
+        return true;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index d0b781f1a1..496e9277f4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -60,6 +60,7 @@ public class CosFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index d2d11da5b4..4ab637c434 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -60,6 +60,7 @@ public class FtpFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index d4c1738490..c3d406d62c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -57,6 +57,7 @@ public class HdfsFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 5025676762..eaea7bccb6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -60,6 +60,7 @@ public class OssFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 4ae2ae3a9b..03ec8660ce 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -56,6 +56,7 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index c6a2d70409..e7d862bd44 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -60,6 +60,7 @@ public class OssFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 71156a21b6..a3b4808865 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -65,6 +65,7 @@ public class S3FileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index 18cda2fbe5..e9efe1cdf9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -60,6 +60,7 @@ public class SftpFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfig.DATE_FORMAT)
                 .optional(BaseSourceConfig.DATETIME_FORMAT)
                 .optional(BaseSourceConfig.TIME_FORMAT)
+                .optional(BaseSourceConfig.FILE_FILTER_PATTERN)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 5fc0e48609..15a58ebf08 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -21,23 +21,20 @@ import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Collections;
 import java.util.stream.Stream;
 
@@ -87,19 +84,26 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         Startables.deepStart(Stream.of(ftpContainer)).join();
         log.info("ftp container started");
 
-        Path jsonPath = 
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
-        Path textPath = 
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
-        Path excelPath = 
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
-
-        ftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(jsonPath),
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
-        ftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(textPath),
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
-        ftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(excelPath),
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+        ContainerUtil.copyFileIntoContainers(
+                "/json/e2e.json",
+                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                ftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/text/e2e.txt",
+                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                ftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/excel/e2e.xlsx",
+                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+                ftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/excel/e2e.xlsx",
+                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+                ftpContainer);
+
         ftpContainer.execInContainer("sh", "-c", "chmod -R 777 
/home/vsftpd/seatunnel/");
         ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp 
/home/vsftpd/seatunnel/");
     }
@@ -107,51 +111,31 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     public void testFtpFileReadAndWrite(TestContainer container)
             throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
         // test write ftp excel file
-        Container.ExecResult excelWriteResult =
-                container.executeJob("/excel/fake_source_to_ftp_excel.conf");
-        Assertions.assertEquals(0, excelWriteResult.getExitCode(), 
excelWriteResult.getStderr());
+        helper.execute("/excel/fake_source_to_ftp_excel.conf");
         // test read ftp excel file
-        Container.ExecResult excelReadResult =
-                container.executeJob("/excel/ftp_excel_to_assert.conf");
-        Assertions.assertEquals(0, excelReadResult.getExitCode(), 
excelReadResult.getStderr());
+        helper.execute("/excel/ftp_excel_to_assert.conf");
         // test read ftp excel file with projection
-        Container.ExecResult excelProjectionReadResult =
-                
container.executeJob("/excel/ftp_excel_projection_to_assert.conf");
-        Assertions.assertEquals(
-                0, excelProjectionReadResult.getExitCode(), 
excelProjectionReadResult.getStderr());
+        helper.execute("/excel/ftp_excel_projection_to_assert.conf");
+        // test read ftp excel file with filter
+        helper.execute("/excel/ftp_filter_excel_to_assert.conf");
         // test write ftp text file
-        Container.ExecResult textWriteResult =
-                container.executeJob("/text/fake_to_ftp_file_text.conf");
-        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        helper.execute("/text/fake_to_ftp_file_text.conf");
         // test read skip header
-        Container.ExecResult textWriteAndSkipResult =
-                container.executeJob("/text/ftp_file_text_skip_headers.conf");
-        Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+        helper.execute("/text/ftp_file_text_skip_headers.conf");
         // test read ftp text file
-        Container.ExecResult textReadResult =
-                container.executeJob("/text/ftp_file_text_to_assert.conf");
-        Assertions.assertEquals(0, textReadResult.getExitCode());
+        helper.execute("/text/ftp_file_text_to_assert.conf");
         // test read ftp text file with projection
-        Container.ExecResult textProjectionResult =
-                
container.executeJob("/text/ftp_file_text_projection_to_assert.conf");
-        Assertions.assertEquals(0, textProjectionResult.getExitCode());
+        helper.execute("/text/ftp_file_text_projection_to_assert.conf");
         // test write ftp json file
-        Container.ExecResult jsonWriteResult =
-                container.executeJob("/json/fake_to_ftp_file_json.conf");
-        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        helper.execute("/json/fake_to_ftp_file_json.conf");
         // test read ftp json file
-        Container.ExecResult jsonReadResult =
-                container.executeJob("/json/ftp_file_json_to_assert.conf");
-        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+        helper.execute("/json/ftp_file_json_to_assert.conf");
         // test write ftp parquet file
-        Container.ExecResult parquetWriteResult =
-                container.executeJob("/parquet/fake_to_ftp_file_parquet.conf");
-        Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+        helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
         // test write ftp orc file
-        Container.ExecResult orcWriteResult =
-                container.executeJob("/orc/fake_to_ftp_file_orc.conf");
-        Assertions.assertEquals(0, orcWriteResult.getExitCode());
+        helper.execute("/orc/fake_to_ftp_file_orc.conf");
     }
 
     @AfterAll
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..6af42f6f3d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/ftp_filter_excel_to_assert.conf
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path = "/tmp/seatunnel/read/excel_filter"
+    result_table_name = "ftp"
+    file_format_type = excel
+    delimiter = ;
+    skip_header_row_number = 1
+    file_filter_pattern = "e2e_filter.*"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    source_table_name = "ftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index f5c220deab..aed3576726 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -21,17 +21,14 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.utility.MountableFile;
 
 import java.io.IOException;
-import java.nio.file.Path;
 
 @DisabledOnContainer(
         value = {TestContainerId.SPARK_2_4},
@@ -43,88 +40,70 @@ public class LocalFileIT extends TestSuiteBase {
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory =
             container -> {
-                Path jsonPath = 
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
-                Path orcPath = 
ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath();
-                Path parquetPath = 
ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath();
-                Path textPath = 
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
-                Path excelPath = 
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
-                container.copyFileToContainer(
-                        MountableFile.forHostPath(jsonPath),
-                        
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
-                container.copyFileToContainer(
-                        MountableFile.forHostPath(orcPath),
-                        
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc");
-                container.copyFileToContainer(
-                        MountableFile.forHostPath(parquetPath),
-                        
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet");
-                container.copyFileToContainer(
-                        MountableFile.forHostPath(textPath),
-                        
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
-                container.copyFileToContainer(
-                        MountableFile.forHostPath(excelPath),
-                        
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+                ContainerUtil.copyFileIntoContainers(
+                        "/json/e2e.json",
+                        
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/text/e2e.txt",
+                        
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/excel/e2e.xlsx",
+                        
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/orc/e2e.orc",
+                        
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/parquet/e2e.parquet",
+                        
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/excel/e2e.xlsx",
+                        
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+                        container);
             };
 
     @TestTemplate
     public void testLocalFileReadAndWrite(TestContainer container)
             throws IOException, InterruptedException {
-        Container.ExecResult excelWriteResult =
-                container.executeJob("/excel/fake_to_local_excel.conf");
-        Assertions.assertEquals(0, excelWriteResult.getExitCode(), 
excelWriteResult.getStderr());
-        Container.ExecResult excelReadResult =
-                container.executeJob("/excel/local_excel_to_assert.conf");
-        Assertions.assertEquals(0, excelReadResult.getExitCode(), 
excelReadResult.getStderr());
-        Container.ExecResult excelProjectionReadResult =
-                
container.executeJob("/excel/local_excel_projection_to_assert.conf");
-        Assertions.assertEquals(
-                0, excelProjectionReadResult.getExitCode(), 
excelProjectionReadResult.getStderr());
+        TestHelper helper = new TestHelper(container);
+
+        helper.execute("/excel/fake_to_local_excel.conf");
+        helper.execute("/excel/local_excel_to_assert.conf");
+        helper.execute("/excel/local_excel_projection_to_assert.conf");
         // test write local text file
-        Container.ExecResult textWriteResult =
-                container.executeJob("/text/fake_to_local_file_text.conf");
-        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        helper.execute("/text/fake_to_local_file_text.conf");
         // test read skip header
-        Container.ExecResult textWriteAndSkipResult =
-                
container.executeJob("/text/local_file_text_skip_headers.conf");
-        Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+        helper.execute("/text/local_file_text_skip_headers.conf");
         // test read local text file
-        Container.ExecResult textReadResult =
-                container.executeJob("/text/local_file_text_to_assert.conf");
-        Assertions.assertEquals(0, textReadResult.getExitCode());
+        helper.execute("/text/local_file_text_to_assert.conf");
         // test read local text file with projection
-        Container.ExecResult textProjectionResult =
-                
container.executeJob("/text/local_file_text_projection_to_assert.conf");
-        Assertions.assertEquals(0, textProjectionResult.getExitCode());
+        helper.execute("/text/local_file_text_projection_to_assert.conf");
         // test write local json file
-        Container.ExecResult jsonWriteResult =
-                container.executeJob("/json/fake_to_local_file_json.conf");
-        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        helper.execute("/json/fake_to_local_file_json.conf");
         // test read local json file
-        Container.ExecResult jsonReadResult =
-                container.executeJob("/json/local_file_json_to_assert.conf");
-        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+        helper.execute("/json/local_file_json_to_assert.conf");
         // test write local orc file
-        Container.ExecResult orcWriteResult =
-                container.executeJob("/orc/fake_to_local_file_orc.conf");
-        Assertions.assertEquals(0, orcWriteResult.getExitCode());
+        helper.execute("/orc/fake_to_local_file_orc.conf");
         // test read local orc file
-        Container.ExecResult orcReadResult =
-                container.executeJob("/orc/local_file_orc_to_assert.conf");
-        Assertions.assertEquals(0, orcReadResult.getExitCode());
+        helper.execute("/orc/local_file_orc_to_assert.conf");
         // test read local orc file with projection
-        Container.ExecResult orcProjectionResult =
-                
container.executeJob("/orc/local_file_orc_projection_to_assert.conf");
-        Assertions.assertEquals(0, orcProjectionResult.getExitCode());
+        helper.execute("/orc/local_file_orc_projection_to_assert.conf");
         // test write local parquet file
-        Container.ExecResult parquetWriteResult =
-                
container.executeJob("/parquet/fake_to_local_file_parquet.conf");
-        Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+        helper.execute("/parquet/fake_to_local_file_parquet.conf");
         // test read local parquet file
-        Container.ExecResult parquetReadResult =
-                
container.executeJob("/parquet/local_file_parquet_to_assert.conf");
-        Assertions.assertEquals(0, parquetReadResult.getExitCode());
+        helper.execute("/parquet/local_file_parquet_to_assert.conf");
         // test read local parquet file with projection
-        Container.ExecResult parquetProjectionResult =
-                
container.executeJob("/parquet/local_file_parquet_projection_to_assert.conf");
-        Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
+        
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
+        // test read filtered local file
+        helper.execute("/excel/local_filter_excel_to_assert.conf");
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..86039b44db
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_filter_excel_to_assert.conf
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/excel_filter"
+    result_table_name = "fake"
+    file_format_type = excel
+    delimiter = ;
+    skip_header_row_number = 1
+    file_filter_pattern = "e2e_filter.*"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index 82d1be73db..e5fbcb5f5e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -21,22 +21,19 @@ import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Collections;
 import java.util.stream.Stream;
 
@@ -75,61 +72,54 @@ public class SftpFileIT extends TestSuiteBase implements 
TestResource {
         sftpContainer.start();
         Startables.deepStart(Stream.of(sftpContainer)).join();
         log.info("Sftp container started");
-        Path jsonPath = 
ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
-        Path textPath = 
ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
-        Path excelPath = 
ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
-        sftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(jsonPath),
-                
"/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
-        sftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(textPath),
-                
"/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
-        sftpContainer.copyFileToContainer(
-                MountableFile.forHostPath(excelPath),
-                
"/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+
+        ContainerUtil.copyFileIntoContainers(
+                "/json/e2e.json",
+                
"/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                sftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/text/e2e.txt",
+                
"/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                sftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/excel/e2e.xlsx",
+                
"/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+                sftpContainer);
+
+        ContainerUtil.copyFileIntoContainers(
+                "/excel/e2e.xlsx",
+                
"/home/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+                sftpContainer);
+
         sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel 
/home/seatunnel/tmp/");
     }
 
     @TestTemplate
     public void testSftpFileReadAndWrite(TestContainer container)
             throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
         // test write sftp excel file
-        Container.ExecResult excelWriteResult =
-                container.executeJob("/excel/fakesource_to_sftp_excel.conf");
-        Assertions.assertEquals(0, excelWriteResult.getExitCode(), 
excelWriteResult.getStderr());
+        helper.execute("/excel/fakesource_to_sftp_excel.conf");
         // test read sftp excel file
-        Container.ExecResult excelReadResult =
-                container.executeJob("/excel/sftp_excel_to_assert.conf");
-        Assertions.assertEquals(0, excelReadResult.getExitCode(), 
excelReadResult.getStderr());
+        helper.execute("/excel/sftp_excel_to_assert.conf");
         // test read sftp excel file with projection
-        Container.ExecResult excelProjectionReadResult =
-                
container.executeJob("/excel/sftp_excel_projection_to_assert.conf");
-        Assertions.assertEquals(
-                0, excelProjectionReadResult.getExitCode(), 
excelProjectionReadResult.getStderr());
+        helper.execute("/excel/sftp_excel_projection_to_assert.conf");
+        // test read sftp excel file with filter pattern
+        helper.execute("/excel/sftp_filter_excel_to_assert.conf");
         // test write sftp text file
-        Container.ExecResult textWriteResult =
-                container.executeJob("/text/fake_to_sftp_file_text.conf");
-        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        helper.execute("/text/fake_to_sftp_file_text.conf");
         // test read skip header
-        Container.ExecResult textWriteAndSkipResult =
-                container.executeJob("/text/sftp_file_text_skip_headers.conf");
-        Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+        helper.execute("/text/sftp_file_text_skip_headers.conf");
         // test read sftp text file
-        Container.ExecResult textReadResult =
-                container.executeJob("/text/sftp_file_text_to_assert.conf");
-        Assertions.assertEquals(0, textReadResult.getExitCode());
+        helper.execute("/text/sftp_file_text_to_assert.conf");
         // test read sftp text file with projection
-        Container.ExecResult textProjectionResult =
-                
container.executeJob("/text/sftp_file_text_projection_to_assert.conf");
-        Assertions.assertEquals(0, textProjectionResult.getExitCode());
+        helper.execute("/text/sftp_file_text_projection_to_assert.conf");
         // test write sftp json file
-        Container.ExecResult jsonWriteResult =
-                container.executeJob("/json/fake_to_sftp_file_json.conf");
-        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        helper.execute("/json/fake_to_sftp_file_json.conf");
         // test read sftp json file
-        Container.ExecResult jsonReadResult =
-                container.executeJob("/json/sftp_file_json_to_assert.conf");
-        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+        helper.execute("/json/sftp_file_json_to_assert.conf");
     }
 
     @AfterAll
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
new file mode 100644
index 0000000000..b6cd92f712
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_filter_excel_to_assert.conf
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    path = "tmp/seatunnel/read/excel_filter"
+    result_table_name = "sftp"
+    file_format_type = excel
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    delimiter = ";"
+    file_filter_pattern = "e2e_filter.*"
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+
+
+
+
+
+
+
+
+
+
+
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
new file mode 100644
index 0000000000..a88723f820
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestHelper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.container;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestHelper {
+    private final TestContainer container;
+
+    public TestHelper(TestContainer container) {
+        this.container = container;
+    }
+
+    public void execute(String file) throws IOException, InterruptedException {
+        execute(0, file);
+    }
+
+    public void execute(int exceptResult, String file) throws IOException, 
InterruptedException {
+        Container.ExecResult result = container.executeJob(file);
+        Assertions.assertEquals(exceptResult, result.getExitCode(), 
result.getStderr());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 92d6100a7c..fa5660a170 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -246,4 +246,10 @@ public final class ContainerUtil {
             throw new FactoryException("Could not load service provider for 
containers.", e);
         }
     }
+
+    public static void copyFileIntoContainers(
+            String fileName, String targetPath, GenericContainer<?> container) 
{
+        Path path = getResourcesFile(fileName).toPath();
+        container.copyFileToContainer(MountableFile.forHostPath(path), 
targetPath);
+    }
 }

Reply via email to