This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fd515b871d [Fix][Connector-V2] Fix 'file_filterpattern' cannot filter
directories (#10049)
fd515b871d is described below
commit fd515b871d1797169f454d44d6a3caca4dc82991
Author: LiJie20190102 <[email protected]>
AuthorDate: Fri Dec 5 01:09:02 2025 +0800
[Fix][Connector-V2] Fix 'file_filterpattern' cannot filter directories
(#10049)
---
docs/en/connector-v2/source/CosFile.md | 12 +-
docs/en/connector-v2/source/FtpFile.md | 13 +-
docs/en/connector-v2/source/HdfsFile.md | 12 +-
docs/en/connector-v2/source/Http.md | 6 +-
docs/en/connector-v2/source/LocalFile.md | 14 +-
docs/en/connector-v2/source/OssFile.md | 12 +-
docs/en/connector-v2/source/OssJindoFile.md | 10 +-
docs/en/connector-v2/source/S3File.md | 12 +-
docs/en/connector-v2/source/SftpFile.md | 12 +-
docs/zh/connector-v2/source/CosFile.md | 12 +-
docs/zh/connector-v2/source/FtpFile.md | 14 +-
docs/zh/connector-v2/source/HdfsFile.md | 12 +-
docs/zh/connector-v2/source/Http.md | 6 +-
docs/zh/connector-v2/source/LocalFile.md | 12 +-
docs/zh/connector-v2/source/OssFile.md | 12 +-
docs/zh/connector-v2/source/S3File.md | 12 +-
docs/zh/connector-v2/source/SftpFile.md | 12 +-
.../file/source/reader/AbstractReadStrategy.java | 16 ++-
.../file/reader/FileFilterPatternTest.java | 143 ++++++++++++++++++++
.../filter-pattern/json/json2024/202401.json | 1 +
.../filter-pattern/json/json2025/202501.json | 1 +
.../json/json2025/test_read_json.conf | 39 ++++++
.../test/resources/filter-pattern/json/people.json | 1 +
.../file/ftp/system/SeaTunnelFTPFileSystem.java | 10 +-
.../e2e/connector/file/ftp/FtpFileIT.java | 42 ++++++
.../json/ftp_to_access_for_json_name_filter.conf | 75 +++++++++++
.../json/ftp_to_access_for_json_path_filter.conf | 75 +++++++++++
.../e2e/connector/file/oss/OssFileIT.java | 40 ++++++
.../json/oss_to_access_for_json_name_filter.conf | 80 +++++++++++
.../json/oss_to_access_for_json_path_filter.conf | 80 +++++++++++
.../connector-file-s3-e2e/pom.xml | 19 +--
.../seatunnel/e2e/connector/file/s3/S3FileIT.java | 79 +++++------
.../e2e/connector/file/s3/S3FileWithFilterIT.java | 146 +++++++++++++++++++++
.../file/s3/S3FileWithMultipleTableIT.java | 47 +++----
.../seatunnel/e2e/connector/file/s3/S3Utils.java | 40 +++---
.../json/s3_to_access_for_json_name_filter.conf | 78 +++++++++++
.../json/s3_to_access_for_json_path_filter.conf | 78 +++++++++++
.../e2e/connector/file/fstp/SftpFileIT.java | 40 ++++++
.../json/sftp_to_access_for_json_name_filter.conf | 81 ++++++++++++
.../json/sftp_to_access_for_json_path_filter.conf | 81 ++++++++++++
40 files changed, 1291 insertions(+), 196 deletions(-)
diff --git a/docs/en/connector-v2/source/CosFile.md
b/docs/en/connector-v2/source/CosFile.md
index ecdb87cac4..819b867ec8 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -313,12 +313,12 @@ Whether to use the header line to parse the file, only
used when the file_format
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -330,7 +330,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -338,14 +338,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -355,7 +355,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index cd1807043b..46389a6668 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -106,12 +106,12 @@ Whether to enable remote host verification for FTP data
channels, default is `tr
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
@@ -126,7 +126,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
@@ -138,7 +138,7 @@ The result of this example matching is:
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
@@ -147,8 +147,7 @@ The result of this example matching is:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
@@ -163,7 +162,7 @@ The result of this example matching is:
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index 1275744659..a5895112d6 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -118,12 +118,12 @@ default `\n`
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -135,7 +135,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -143,14 +143,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -160,7 +160,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index 1f0263a268..a5a5306d62 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -59,7 +59,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| pageing.page_type | String | No | PageNumber | this
parameter is used to specify the page type ,or PageNumber if not set, only
support `PageNumber` and `Cursor`. |
| pageing.cursor_field | String | No | - | this
parameter is used to specify the Cursor field name in the request parameter.
|
| pageing.cursor_response_field | String | No | - | This
parameter specifies the field in the response from which the cursor is
retrieved.
|
-| content_json | String | No | - | This
parameter can get some json data.If you only need the data in the 'book'
section, configure `content_field = "$.store.book.*"`.
|
+| content_field | String | No | - | This
parameter can get some json data.If you only need the data in the 'book'
section, configure `content_field = "$.store.book.*"`.
|
| format | String | No | text | The
format of upstream data, now only support `json` `text`, default `text`.
|
| method | String | No | get | Http
request method, only supports GET, POST method.
|
| headers | Map | No | - | Http
headers.
|
@@ -221,7 +221,7 @@ The HTTP body is used to carry the actual data in requests
or responses, includi
The reference format is as follows:
```hocon
-body="{"id":1,"name":"setunnel"}"
+body="{"id":1,"name":"seatunnel"}"
```
For form submissions,please set the content-type as follows.
@@ -231,7 +231,7 @@ headers {
}
```
-### content_json
+### content_field
This parameter can get some json data.If you only need the data in the 'book'
section, configure `content_field = "$.store.book.*"`.
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index c712166eba..c9338b3cbf 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -304,12 +304,12 @@ Whether to use the header line to parse the file, only
used when the file_format
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -321,7 +321,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -329,14 +329,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -346,7 +346,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
@@ -413,7 +413,7 @@ File modification time filter. The connector will filter
some files base on the
### file_filter_modified_end [string]
-File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
+File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
### common options
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index d3ed99e235..1bcdf9fa8d 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -265,12 +265,12 @@ Whether to read the complete file as a single chunk
instead of splitting into ch
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -282,7 +282,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -290,14 +290,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -307,7 +307,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index 8d7634cb91..221a22fa0f 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -299,7 +299,7 @@ Reader the sheet of the workbook.
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
@@ -316,7 +316,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -324,14 +324,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -341,7 +341,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index 5ef70e3329..be3b5112cd 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -257,12 +257,12 @@ default `\n`
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -274,7 +274,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -282,14 +282,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -299,7 +299,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index 14f4315e67..980ad9ae33 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -113,12 +113,12 @@ The File does not have a specific type list, and we can
indicate which SeaTunnel
### file_filter_pattern [string]
-Filter pattern, which used for filtering files.
+Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
The pattern follows standard regular expressions. For details, please refer to
https://en.wikipedia.org/wiki/Regular_expression.
There are some examples.
-File Structure Example:
+If the `path` is `/data/seatunnel`, and the file structure example is:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -130,7 +130,7 @@ Matching Rules Example:
**Example 1**: *Match all .txt files*,Regular Expression:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
The result of this example matching is:
```
@@ -138,14 +138,14 @@ The result of this example matching is:
```
**Example 2**: *Match all file starting with abc*,Regular Expression:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
The result of this example matching is:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**Example 3**: *Match all file starting with abc,And the fourth character is
either h or g*, the Regular Expression:
+**Example 3**: *Match all files starting with abc in folder 20241007,And the
fourth character is either h or g*, the Regular Expression:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -155,7 +155,7 @@ The result of this example matching is:
```
**Example 4**: *Match third level folders starting with 202410 and files
ending with .csv*, the Regular Expression:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
The result of this example matching is:
```
diff --git a/docs/zh/connector-v2/source/CosFile.md
b/docs/zh/connector-v2/source/CosFile.md
index 5f26a7f70a..e45cdcffa4 100644
--- a/docs/zh/connector-v2/source/CosFile.md
+++ b/docs/zh/connector-v2/source/CosFile.md
@@ -311,12 +311,12 @@ default `HH:mm:ss`
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参阅https://en.wikipedia.org/wiki/Regular_expression.
有一些例子。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -328,7 +328,7 @@ default `HH:mm:ss`
**示例1**:*匹配所有.txt文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果为:
```
@@ -336,14 +336,14 @@ default `HH:mm:ss`
```
**示例2**:*匹配所有以abc*开头的文件,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果为:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例3**:*匹配所有以abc开头的文件,第四个字符是h或g*,正则表达式:
+**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -353,7 +353,7 @@ default `HH:mm:ss`
```
**示例4**:*匹配以202410开头的三级文件夹和以.csv*结尾的文件,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果为:
```
diff --git a/docs/zh/connector-v2/source/FtpFile.md
b/docs/zh/connector-v2/source/FtpFile.md
index f7272f45a2..54a48370cb 100644
--- a/docs/zh/connector-v2/source/FtpFile.md
+++ b/docs/zh/connector-v2/source/FtpFile.md
@@ -102,12 +102,12 @@ import ChangeLog from
'../changelog/connector-file-ftp.md';
### file_filter_pattern [string]
-文件过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考:https://en.wikipedia.org/wiki/Regular_expression.
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
@@ -121,7 +121,7 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
**示例 1**:*匹配所有 .txt 文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
该示例匹配结果为:
```
@@ -129,14 +129,14 @@ import ChangeLog from
'../changelog/connector-file-ftp.md';
```
**示例 2**:*匹配所有以 abc 开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
该示例匹配结果为:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例 3**:*匹配所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
+**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -146,7 +146,7 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
```
**示例 4**:*匹配第三级文件夹以 202410 开头且文件以 .csv 结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
该示例匹配结果为:
```
@@ -352,7 +352,7 @@ SeaTunnel 将从源文件中跳过前 2 行。
### csv_use_header_line [boolean]
仅在文件格式为 csv 时可以选择配置。
-是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配
+是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配
### compress_codec [string]
diff --git a/docs/zh/connector-v2/source/HdfsFile.md
b/docs/zh/connector-v2/source/HdfsFile.md
index 0c9bc29153..655f25192b 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -119,12 +119,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -136,7 +136,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
**示例 1**:*匹配所有 .txt 文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果是:
```
@@ -144,14 +144,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例 2**:*匹配所有以 abc 开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式:
+**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -161,7 +161,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果是:
```
diff --git a/docs/zh/connector-v2/source/Http.md
b/docs/zh/connector-v2/source/Http.md
index c790217df9..ded864a335 100644
--- a/docs/zh/connector-v2/source/Http.md
+++ b/docs/zh/connector-v2/source/Http.md
@@ -49,7 +49,7 @@ import ChangeLog from '../changelog/connector-http.md';
| pageing.page_type | String | 否 | PageNumber |
此参数用于指定页面类型,如果未设置则为 PageNumber,仅支持 `PageNumber` 和 `Cursor`。
|
| pageing.cursor_field | String | 否 | - |
此参数用于指定请求参数中的游标字段名称。
|
| pageing.cursor_response_field | String | 否 | - |
此参数指定从中检索游标的响应字段。
|
-| content_json | String | 否 | - | 此参数可以获取一些
json 数据。如果您只需要 'book' 部分的数据,配置 `content_field = "$.store.book.*"`。
|
+| content_field | String | 否 | - | 此参数可以获取一些
json 数据。如果您只需要 'book' 部分的数据,配置 `content_field = "$.store.book.*"`。
|
| format | String | 否 | text |
上游数据的格式,目前仅支持 `json` `text`,默认为 `text`。
|
| method | String | 否 | get | Http
请求方法,仅支持 GET、POST 方法。
|
| headers | Map | 否 | - | Http 头信息。
|
@@ -210,7 +210,7 @@ HTTP body 用于在请求或响应中携带实际数据,包括 JSON、表单
参考格式如下:
```hocon
-body="{"id":1,"name":"setunnel"}"
+body="{"id":1,"name":"seatunnel"}"
```
对于表单提交,请按如下设置 content-type。
@@ -220,7 +220,7 @@ headers {
}
```
-### content_json
+### content_field
此参数可以获取一些 json 数据。如果您只需要 'book' 部分的数据,配置 `content_field = "$.store.book.*"`。
diff --git a/docs/zh/connector-v2/source/LocalFile.md
b/docs/zh/connector-v2/source/LocalFile.md
index 752ea87de1..b2eb9b86f5 100644
--- a/docs/zh/connector-v2/source/LocalFile.md
+++ b/docs/zh/connector-v2/source/LocalFile.md
@@ -304,12 +304,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -321,7 +321,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
**示例 1**:*匹配所有 .txt 文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果是:
```
@@ -329,14 +329,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例 2**:*匹配所有以 abc 开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例 3**:*匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件*,正则表达式:
+**示例 3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -346,7 +346,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例 4**:*匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果是:
```
diff --git a/docs/zh/connector-v2/source/OssFile.md
b/docs/zh/connector-v2/source/OssFile.md
index a7fa405586..25bb1bce9c 100644
--- a/docs/zh/connector-v2/source/OssFile.md
+++ b/docs/zh/connector-v2/source/OssFile.md
@@ -264,12 +264,12 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -281,7 +281,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
**示例1**:*匹配所有.txt文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果是:
```
@@ -289,14 +289,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例2**:*匹配所有以abc开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式:
+**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -306,7 +306,7 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
```
**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果是:
```
diff --git a/docs/zh/connector-v2/source/S3File.md
b/docs/zh/connector-v2/source/S3File.md
index 1d7b90ed1c..9e4f0b9fca 100644
--- a/docs/zh/connector-v2/source/S3File.md
+++ b/docs/zh/connector-v2/source/S3File.md
@@ -236,12 +236,12 @@ schema {
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -253,7 +253,7 @@ schema {
**示例1**:*匹配所有.txt文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果是:
```
@@ -261,14 +261,14 @@ schema {
```
**示例2**:*匹配所有以abc开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式:
+**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -278,7 +278,7 @@ schema {
```
**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果是:
```
diff --git a/docs/zh/connector-v2/source/SftpFile.md
b/docs/zh/connector-v2/source/SftpFile.md
index e48f4e81c5..0182f53ba8 100644
--- a/docs/zh/connector-v2/source/SftpFile.md
+++ b/docs/zh/connector-v2/source/SftpFile.md
@@ -113,12 +113,12 @@ import ChangeLog from
'../changelog/connector-file-sftp.md';
### file_filter_pattern [string]
-过滤模式,用于过滤文件。
+文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。
以下是一些示例。
-文件结构示例:
+若`path`为`/data/seatunnel`,且文件结构示例:
```
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
@@ -130,7 +130,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md';
**示例1**:*匹配所有.txt文件*,正则表达式:
```
-/data/seatunnel/20241001/.*\.txt
+.*.txt
```
此示例匹配的结果是:
```
@@ -138,14 +138,14 @@ import ChangeLog from
'../changelog/connector-file-sftp.md';
```
**示例2**:*匹配所有以abc开头的文件*,正则表达式:
```
-/data/seatunnel/20241002/abc.*
+abc.*
```
此示例匹配的结果是:
```
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
```
-**示例3**:*匹配所有以abc开头,且第四个字符是h或g的文件*,正则表达式:
+**示例3**:*匹配20241007文件夹下所有以 abc 开头的文件,且第四个字符为 h 或 g*,正则表达式:
```
/data/seatunnel/20241007/abc[h,g].*
```
@@ -155,7 +155,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md';
```
**示例4**:*匹配以202410开头的第三级文件夹和以.csv结尾的文件*,正则表达式:
```
-/data/seatunnel/202410\d*/.*\.csv
+/data/seatunnel/202410\d*/.*.csv
```
此示例匹配的结果是:
```
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index a8c302ab74..3bcda9bd45 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -92,6 +92,7 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected Pattern pattern;
protected Date fileModifiedStartDate;
protected Date fileModifiedEndDate;
+ protected String fileBasePath;
@Override
public void init(HadoopConf conf) {
@@ -223,6 +224,11 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
String filterPattern =
pluginConfig.getString(FileBaseSourceOptions.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(filterPattern);
+ // because 'ConfigFactory.systemProperties()' has a 'path'
parameter, it is necessary to
+ // obtain 'path' under the premise of 'FILE_FILTER_PATTERN'
+ if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())) {
+ fileBasePath =
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
+ }
}
if
(pluginConfig.hasPath(FileBaseSourceOptions.FILE_FILTER_MODIFIED_START.key())) {
fileModifiedStartDate =
@@ -406,7 +412,15 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
protected boolean filterFileByPattern(FileStatus fileStatus) {
- if (Objects.nonNull(pattern)) {
+ if (Objects.nonNull(pattern) && Objects.nonNull(fileBasePath)) {
+ if (pattern.pattern().startsWith(fileBasePath)) {
+ // filter based on the file directory at the same time
+ String absPath = fileStatus.getPath().toUri().getPath();
+ // absPath.substring(absPath.indexOf(fileBasePath), It is to
be compatible with
+ // scenarios where fileBasePath is a relative path
+ return
pattern.matcher(absPath.substring(absPath.indexOf(fileBasePath))).matches();
+ }
+ // filter based on file names
return pattern.matcher(fileStatus.getPath().getName()).matches();
}
return true;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java
new file mode 100644
index 0000000000..3691978dd2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/FileFilterPatternTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+public class FileFilterPatternTest {
+ /**
+ * filter based on the file directory at the same time, the expression
needs to start with
+ * `path`
+ *
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ @Test
+ @DisabledOnOs(OS.WINDOWS)
+ public void testJsonFilterPatternWithFilePath() throws URISyntaxException,
IOException {
+ URL filterPattern =
FileFilterPatternTest.class.getResource("/filter-pattern/json");
+ URL conf =
+ ExcelReadStrategyTest.class.getResource(
+ "/filter-pattern/json/json2025/test_read_json.conf");
+ Assertions.assertNotNull(filterPattern);
+ Assertions.assertNotNull(conf);
+ // path
+ String jsonPathDir = filterPattern.toURI().getPath();
+ // the expression needs to start with `path`
+ String fileFilterPattern = jsonPathDir + "/json202[^/]*/.*.json";
+
+ String confPath = Paths.get(conf.toURI()).toString();
+ Config pluginConfig =
+ ConfigFactory.parseFile(new File(confPath))
+ .withValue(
+
FileBaseSourceOptions.FILE_FILTER_PATTERN.key(),
+
ConfigValueFactory.fromAnyRef(fileFilterPattern))
+ .withValue(
+ FileBaseSourceOptions.FILE_PATH.key(),
+ ConfigValueFactory.fromAnyRef(jsonPathDir));
+
+ JsonReadStrategy jsonReadStrategy = new JsonReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ jsonReadStrategy.setPluginConfig(pluginConfig);
+ jsonReadStrategy.init(localConf);
+
+ List<String> filterFileNames =
jsonReadStrategy.getFileNamesByPath(jsonPathDir);
+ Assertions.assertEquals(2, filterFileNames.size());
+ String fileName = filterFileNames.get(0);
+ Assertions.assertTrue(fileName.endsWith(".json"));
+ }
+
+ /**
+ * filter based on file names, just simply write the regular file names
+ *
+ * @throws URISyntaxException
+ * @throws IOException
+ */
+ @Test
+ @DisabledOnOs(OS.WINDOWS)
+ public void testJsonFilterPatternWithFileName() throws URISyntaxException,
IOException {
+ URL filterPattern =
FileFilterPatternTest.class.getResource("/filter-pattern/json");
+ URL conf =
+ ExcelReadStrategyTest.class.getResource(
+ "/filter-pattern/json/json2025/test_read_json.conf");
+ Assertions.assertNotNull(filterPattern);
+ Assertions.assertNotNull(conf);
+ // path
+ String jsonPathDir = filterPattern.toURI().getPath();
+ // just simply write the regular file names
+ String fileFilterPattern = ".*.json";
+ String confPath = Paths.get(conf.toURI()).toString();
+ Config pluginConfig =
+ ConfigFactory.parseFile(new File(confPath))
+ .withValue(
+
FileBaseSourceOptions.FILE_FILTER_PATTERN.key(),
+
ConfigValueFactory.fromAnyRef(fileFilterPattern))
+ .withValue(
+ FileBaseSourceOptions.FILE_PATH.key(),
+ ConfigValueFactory.fromAnyRef(jsonPathDir));
+ JsonReadStrategy jsonReadStrategy = new JsonReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ jsonReadStrategy.setPluginConfig(pluginConfig);
+ jsonReadStrategy.init(localConf);
+
+ List<String> filterFileNames =
jsonReadStrategy.getFileNamesByPath(jsonPathDir);
+ Assertions.assertEquals(3, filterFileNames.size());
+ for (String fileName : filterFileNames) {
+ Assertions.assertTrue(fileName.endsWith(".json"));
+ }
+ }
+
+ public static class LocalConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
+
+ public LocalConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json
new file mode 100644
index 0000000000..d1d8fabdbf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2024/202401.json
@@ -0,0 +1 @@
+{"name": "202401"}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json
new file mode 100644
index 0000000000..014000891b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/202501.json
@@ -0,0 +1 @@
+{"name": "202501"}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf
new file mode 100644
index 0000000000..295640031c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/json2025/test_read_json.conf
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+ sheet_name = "Sheet1"
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_bytes = "tinyint"
+ c_short = "smallint"
+ c_int = "int"
+ c_bigint = "bigint"
+ c_string = "string"
+ c_double = "double"
+ c_float = "float"
+ c_decimal = "decimal(10, 2)"
+ c_boolean = "boolean"
+ c_map = "map<string, string>"
+ c_array = "array<string>"
+ c_date = "date"
+ c_datetime = "timestamp"
+ c_time = "time"
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json
new file mode 100644
index 0000000000..76b148cddc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/filter-pattern/json/people.json
@@ -0,0 +1 @@
+{"name": "people"}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index c6bb9c8aed..dc312d8ff7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -271,7 +271,15 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
*/
private Path makeAbsolute(Path workDir, Path path) {
if (path.isAbsolute()) {
- return path;
+ String filePath = path.toUri().getPath();
+ if (filePath.equals("/")) {
+ return workDir;
+ }
+ if (filePath.startsWith(workDir.toUri().getPath())) {
+ return path;
+ }
+ // delete '/'
+ return new Path(workDir, filePath.substring(1));
}
return new Path(workDir, path);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 95b3a408e4..575fe7f9ad 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -206,6 +206,48 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
deleteFileFromContainer(homePath);
}
+ @TestTemplate
+ public void testFtpToAssertForJsonFilter(TestContainer container)
+ throws IOException, InterruptedException {
+
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ ftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
+ ftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
+ ftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
+ ftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ ftpContainer);
+
+ ftpContainer.execInContainer("sh", "-c", "chmod -R 777
/home/vsftpd/seatunnel/");
+ ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
+
+ TestHelper helper = new TestHelper(container);
+ // -----filter based on the file directory at the same time, the
expression needs to start
+ // with `path`--------
+ helper.execute("/json/ftp_to_access_for_json_path_filter.conf");
+
+ // -------filter based on file names, just simply write the regular
file names--------
+ helper.execute("/json/ftp_to_access_for_json_name_filter.conf");
+
+ // delete path
+ String filterPath = "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter";
+ deleteFileFromContainer(filterPath);
+ }
+
private void assertJobExecution(TestContainer container, String
configPath, List<String> params)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob(configPath,
params);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf
new file mode 100644
index 0000000000..615a705c67
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_name_filter.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path= "tmp/seatunnel/read/filter"
+ file_filter_pattern=".*.json"
+ file_format_type= "json"
+ encoding = "UTF-8"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 15
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 15
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf
new file mode 100644
index 0000000000..49b0e95ad3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_to_access_for_json_path_filter.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path= "tmp/seatunnel/read/filter"
+ file_filter_pattern="tmp/seatunnel/read/filter/json202[^/]*/.*.json"
+ file_format_type= "json"
+ encoding = "UTF-8"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
index 3750d82d9b..09dbab45a4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
@@ -94,6 +94,46 @@ public class OssFileIT extends TestSuiteBase {
Assertions.assertEquals(0, extraCommands.getExitCode());
};
+ @TestTemplate
+ public void testOssToAccessForJsonFilter(TestContainer container)
+ throws IOException, InterruptedException {
+ // Copy test files to OSS
+ OssUtils ossUtils = new OssUtils();
+ try {
+ ossUtils.uploadTestFiles(
+ "/json/e2e.json",
+
"test/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ true);
+
+ ossUtils.uploadTestFiles(
+ "/json/e2e.json",
+
"test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
+ true);
+ ossUtils.uploadTestFiles(
+ "/text/e2e.txt",
+
"test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
+ true);
+ ossUtils.uploadTestFiles(
+ "/json/e2e.json",
+
"test/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
+ true);
+
+ ossUtils.uploadTestFiles(
+ "/text/e2e.txt",
+
"test/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ true);
+ } finally {
+ ossUtils.close();
+ }
+
+ TestHelper helper = new TestHelper(container);
+ // -----filter based on the file directory at the same time, the
expression needs to start
+ // with `path`--------
+ helper.execute("oss_to_access_for_json_path_filter.conf");
+ // -------filter based on file names, just simply write the regular
file names--------
+ helper.execute("oss_to_access_for_json_name_filter.conf");
+ }
+
/** Copy data files to oss */
@TestTemplate
public void testOssFileReadAndWrite(TestContainer container)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf
new file mode 100644
index 0000000000..f1fc9a9832
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_name_filter.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ OssFile {
+ bucket = "oss://whale-ops"
+ access_key = "xxxxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxx"
+ endpoint = "https://oss-accelerate.aliyuncs.com"
+ path = "/test/seatunnel/read/filter"
+ file_filter_pattern=".*.json"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 15
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 15
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf
new file mode 100644
index 0000000000..f689f8cc54
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/json/oss_to_access_for_json_path_filter.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ OssFile {
+ bucket = "oss://whale-ops"
+ access_key = "xxxxxxxxxxxxxxxxxxx"
+ access_secret = "xxxxxxxxxxxxxxxxxxx"
+ endpoint = "https://oss-accelerate.aliyuncs.com"
+ path = "/test/seatunnel/read/filter"
+ file_filter_pattern="/test/seatunnel/read/filter/json202[^/]*/.*.json"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/pom.xml
index e33b6273a8..d84681244d 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/pom.xml
@@ -24,10 +24,6 @@
<artifactId>connector-file-s3-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : File S3</name>
- <properties>
- <hadoop-aws.version>3.1.4</hadoop-aws.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -42,15 +38,22 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aws</artifactId>
- <version>${hadoop-aws.version}</version>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-hadoop-aws</artifactId>
+ <version>${project.version}</version>
+ <classifier>optional</classifier>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
- <version>1.11.271</version>
+ <version>1.12.692</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java
index 3ef03aad70..fdf02df628 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileIT.java
@@ -71,50 +71,41 @@ public class S3FileIT extends TestSuiteBase {
public void testS3FileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
// Copy test files to s3
- S3Utils s3Utils = new S3Utils();
- try {
- s3Utils.uploadTestFiles(
- "/json/e2e.json",
-
"test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
- true);
- Path jsonLzo =
convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json"));
- s3Utils.uploadTestFiles(
- jsonLzo.toString(),
"test/seatunnel/read/lzo_json/e2e.json", false);
- s3Utils.uploadTestFiles(
- "/text/e2e.txt",
-
"test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
- true);
- s3Utils.uploadTestFiles(
- "/text/e2e_delimiter.txt",
"test/seatunnel/read/text_delimiter/e2e.txt", true);
- s3Utils.uploadTestFiles(
- "/text/e2e_time_format.txt",
- "test/seatunnel/read/text_time_format/e2e.txt",
- true);
- Path txtLzo =
convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt"));
- s3Utils.uploadTestFiles(
- txtLzo.toString(), "test/seatunnel/read/lzo_text/e2e.txt",
false);
- s3Utils.uploadTestFiles(
- "/excel/e2e.xlsx",
-
"test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
- true);
- s3Utils.uploadTestFiles(
- "/orc/e2e.orc",
-
"test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
- true);
- s3Utils.uploadTestFiles(
- "/parquet/e2e.parquet",
-
"test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
- true);
- s3Utils.uploadTestFiles(
- "/excel/e2e.xlsx",
-
"test/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
- true);
- s3Utils.uploadTestFiles(
- "/text/e2e-text.zip",
"test/seatunnel/read/text_zip/e2e-text.zip", true);
- s3Utils.createDir("tmp/fake_empty");
- } finally {
- s3Utils.close();
- }
+ S3Utils.uploadTestFiles(
+ "/json/e2e.json",
+
"test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ true);
+ Path jsonLzo =
convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json"));
+ S3Utils.uploadTestFiles(jsonLzo.toString(),
"test/seatunnel/read/lzo_json/e2e.json", false);
+ S3Utils.uploadTestFiles(
+ "/text/e2e.txt",
+
"test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ true);
+ S3Utils.uploadTestFiles(
+ "/text/e2e_delimiter.txt",
"test/seatunnel/read/text_delimiter/e2e.txt", true);
+ S3Utils.uploadTestFiles(
+ "/text/e2e_time_format.txt",
"test/seatunnel/read/text_time_format/e2e.txt", true);
+ Path txtLzo =
convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt"));
+ S3Utils.uploadTestFiles(txtLzo.toString(),
"test/seatunnel/read/lzo_text/e2e.txt", false);
+ S3Utils.uploadTestFiles(
+ "/excel/e2e.xlsx",
+
"test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ true);
+ S3Utils.uploadTestFiles(
+ "/orc/e2e.orc",
+
"test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+ true);
+ S3Utils.uploadTestFiles(
+ "/parquet/e2e.parquet",
+
"test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+ true);
+ S3Utils.uploadTestFiles(
+ "/excel/e2e.xlsx",
+
"test/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+ true);
+ S3Utils.uploadTestFiles(
+ "/text/e2e-text.zip",
"test/seatunnel/read/text_zip/e2e-text.zip", true);
+ S3Utils.createDir("tmp/fake_empty");
TestHelper helper = new TestHelper(container);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
new file mode 100644
index 0000000000..546baf06a3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.s3;
+
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+@Slf4j
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class S3FileWithFilterIT extends SeaTunnelContainer {
+ private GenericContainer<?> s3Container;
+
+ private static final String MINIO_IMAGE =
"minio/minio:RELEASE.2024-06-13T22-53-53Z";
+
+ private static final int S3_PORT = 9000;
+
+ private static final String S3_CONTAINER_HOST = "s3";
+
+ protected static final String AWS_SDK_DOWNLOAD =
+
"https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar";
+ protected static final String HADOOP_AWS_DOWNLOAD =
+
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ s3Container =
+ new GenericContainer<>(DockerImageName.parse(MINIO_IMAGE))
+ .withNetwork(NETWORK)
+ .withExposedPorts(S3_PORT)
+ .withNetworkAliases(S3_CONTAINER_HOST)
+ .withCreateContainerCmdModifier(
+ cmd ->
+ cmd.withPortBindings(
+ new PortBinding(
+
Ports.Binding.bindPort(S3_PORT),
+ new
ExposedPort(S3_PORT))))
+ .withLogConsumer(new Slf4jLogConsumer(log))
+ .withEnv("MINIO_ROOT_USER", "minioadmin")
+ .withEnv("MINIO_ROOT_PASSWORD", "minioadmin")
+ .withCommand("server", "/data")
+ .waitingFor(Wait.forLogMessage(".*", 1));
+ s3Container.start();
+
+ super.startUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (s3Container != null) {
+ s3Container.close();
+ }
+ }
+
+ @Override
+ protected String[] buildStartCommand() {
+ return new String[] {
+ "bash",
+ "-c",
+ "wget -P "
+ + SEATUNNEL_HOME
+ + "lib "
+ + AWS_SDK_DOWNLOAD
+ + " &&"
+ + "wget -P "
+ + SEATUNNEL_HOME
+ + "lib "
+ + HADOOP_AWS_DOWNLOAD
+ + " &&"
+ + ContainerUtil.adaptPathForWin(
+ Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL).toString())
+ };
+ }
+
+ @Test
+ public void testS3ToAssertForJsonFilter() throws IOException,
InterruptedException {
+
+ // Copy test files to s3
+ S3Utils.uploadTestFiles(
+ "/json/e2e.json",
+
"/test/seatunnel/read/filter/json/name=tyrantlucifer/hobby=codin/e2e.json",
+ true);
+
+ S3Utils.uploadTestFiles(
+ "/json/e2e.json",
+
"/test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=codin/e2e.json",
+ true);
+
+ S3Utils.uploadTestFiles(
+ "/text/e2e.txt",
+
"/test/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=codin/e2e_2025.txt",
+ true);
+
+ S3Utils.uploadTestFiles(
+ "/json/e2e.json",
+
"/test/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=codin/e2e_2024.json",
+ true);
+
+ S3Utils.uploadTestFiles(
+ "/text/e2e.txt",
+
"/test/seatunnel/read/filter/text/name=tyrantlucifer/hobby=codin/e2e.txt",
+ true);
+ // -----filter based on the file directory at the same time, the
expression needs to start
+ Container.ExecResult execPathResult =
+ executeJob("/json/s3_to_access_for_json_path_filter.conf");
+ Assertions.assertEquals(0, execPathResult.getExitCode());
+
+ // -------filter based on file names, just simply write the regular
file names--------
+ Container.ExecResult execNameResult =
+ executeJob("/json/s3_to_access_for_json_name_filter.conf");
+ Assertions.assertEquals(0, execNameResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java
index 34fd443146..bc45fdeece 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithMultipleTableIT.java
@@ -76,32 +76,27 @@ public class S3FileWithMultipleTableIT extends
TestSuiteBase {
@TestTemplate
public void addTestFiles(TestContainer container) throws IOException,
InterruptedException {
// Copy test files to s3
- S3Utils s3Utils = new S3Utils();
- try {
- s3Utils.uploadTestFiles(
- "/json/e2e.json",
-
"test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
- true);
- s3Utils.uploadTestFiles(
- "/text/e2e.txt",
-
"test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
- true);
- s3Utils.uploadTestFiles(
- "/excel/e2e.xlsx",
-
"test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
- true);
- s3Utils.uploadTestFiles(
- "/orc/e2e.orc",
-
"test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
- true);
- s3Utils.uploadTestFiles(
- "/parquet/e2e.parquet",
-
"test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
- true);
- s3Utils.createDir("tmp/fake_empty");
- } finally {
- s3Utils.close();
- }
+ S3Utils.uploadTestFiles(
+ "/json/e2e.json",
+
"test/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ true);
+ S3Utils.uploadTestFiles(
+ "/text/e2e.txt",
+
"test/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ true);
+ S3Utils.uploadTestFiles(
+ "/excel/e2e.xlsx",
+
"test/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ true);
+ S3Utils.uploadTestFiles(
+ "/orc/e2e.orc",
+
"test/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+ true);
+ S3Utils.uploadTestFiles(
+ "/parquet/e2e.parquet",
+
"test/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+ true);
+ S3Utils.createDir("tmp/fake_empty");
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java
index 6c3f449d78..ab3dbfc243 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3Utils.java
@@ -34,29 +34,32 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
-public class S3Utils {
+public class S3Utils implements AutoCloseable {
private static Logger logger = LoggerFactory.getLogger(S3Utils.class);
- private static final String ACCESS_KEY = "XXXXXX";
- private static final String SECRET_KEY = "AWS_XXXX";
+ private static final String ACCESS_KEY = "minioadmin";
+ private static final String SECRET_KEY = "minioadmin";
private static final String REGION = "cn-north-1";
- private static final String ENDPOINT =
- "s3.cn-north-1.amazonaws.com.cn"; // For example,
"https://s3.amazonaws.com"
- private String bucket = "ws-package";
+ private static final String ENDPOINT = "http://localhost:9000";
+ private static final String BUCKET = "ws-package";
- private final AmazonS3 s3Client;
+ private static final AmazonS3 S3_CLIENT;
- public S3Utils() {
+ static {
BasicAWSCredentials credentials = new BasicAWSCredentials(ACCESS_KEY,
SECRET_KEY);
-
- this.s3Client =
+ S3_CLIENT =
AmazonS3ClientBuilder.standard()
.withCredentials(new
AWSStaticCredentialsProvider(credentials))
+ .enablePathStyleAccess()
.withEndpointConfiguration(
new
AwsClientBuilder.EndpointConfiguration(ENDPOINT, REGION))
.build();
+
+ if (!S3_CLIENT.doesBucketExistV2(BUCKET)) {
+ S3_CLIENT.createBucket(BUCKET);
+ }
}
- public void uploadTestFiles(
+ public static void uploadTestFiles(
String filePath, String targetFilePath, boolean
isFindFromResource) {
File resourcesFile = null;
if (isFindFromResource) {
@@ -64,21 +67,22 @@ public class S3Utils {
} else {
resourcesFile = new File(filePath);
}
- s3Client.putObject(bucket, targetFilePath, resourcesFile);
+ S3_CLIENT.putObject(BUCKET, targetFilePath, resourcesFile);
}
- public void createDir(String dir) {
+ public static void createDir(String dir) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest =
- new PutObjectRequest(bucket, dir, emptyContent, metadata);
- s3Client.putObject(putObjectRequest);
+ new PutObjectRequest(BUCKET, dir, emptyContent, metadata);
+ S3_CLIENT.putObject(putObjectRequest);
}
- public void close() {
- if (s3Client != null) {
- s3Client.shutdown();
+ @Override
+ public void close() throws Exception {
+ if (S3_CLIENT != null) {
+ S3_CLIENT.shutdown();
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf
new file mode 100644
index 0000000000..284c50a406
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_name_filter.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = "local"
+}
+
+source {
+ S3File {
+ fs.s3a.endpoint = "http://s3:9000"
+ hadoop_s3_properties={
+ "fs.s3a.path.style.access"="true"
+ "fs.s3a.statistics.enable"="false"
+ }
+ fs.s3a.aws.credentials.provider =
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+ access_key = "minioadmin"
+ secret_key = "minioadmin"
+ bucket = "s3a://ws-package"
+ path = "/test/seatunnel/read/filter"
+ file_filter_pattern = ".*.json"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 15
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 15
+ }
+ ]
+ field_rules = [
+ {
+ field_name = "c_string"
+ field_type = "string"
+ field_value = [
+ { rule_type = NOT_NULL },
+ { rule_type = MIN_LENGTH, rule_value = 5 },
+ { rule_type = MAX_LENGTH, rule_value = 5 }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf
new file mode 100644
index 0000000000..2b24333592
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/json/s3_to_access_for_json_path_filter.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = "local"
+}
+
+source {
+ S3File {
+ fs.s3a.endpoint = "http://s3:9000"
+ hadoop_s3_properties={
+ "fs.s3a.path.style.access"="true"
+ "fs.s3a.statistics.enable"="false"
+ }
+ fs.s3a.aws.credentials.provider =
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+ access_key = "minioadmin"
+ secret_key = "minioadmin"
+ bucket = "s3a://ws-package"
+ path = "/test/seatunnel/read/filter"
+ file_filter_pattern = "/test/seatunnel/read/filter/json202[^/]*/.*.json"
+ file_format_type = "json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ]
+ field_rules = [
+ {
+ field_name = "c_string"
+ field_type = "string"
+ field_value = [
+ { rule_type = NOT_NULL },
+ { rule_type = MIN_LENGTH, rule_value = 5 },
+ { rule_type = MAX_LENGTH, rule_value = 5 }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index fdc9b94cbd..d8bc2efbb9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -127,6 +127,46 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel
/home/seatunnel/tmp/");
}
+ @TestTemplate
+ public void testFtpToAssertJsonFilter(TestContainer container)
+ throws IOException, InterruptedException {
+
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=codin/e2e.json",
+ sftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
+ sftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
+ sftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/home/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
+ sftpContainer);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/home/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ sftpContainer);
+ sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel
/home/seatunnel/tmp/");
+
+ TestHelper helper = new TestHelper(container);
+ // -----filter based on the file directory at the same time, the
expression needs to start
+ // with `path`--------
+ helper.execute("/json/sftp_to_access_for_json_path_filter.conf");
+
+ // -------filter based on file names, just simply write the regular
file names--------
+ helper.execute("/json/sftp_to_access_for_json_name_filter.conf");
+
+ // delete path
+ String filterPath = "/home/seatunnel/tmp/seatunnel/read/filter";
+ deleteFileFromContainer(filterPath);
+ }
+
@TestTemplate
public void testSftpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf
new file mode 100644
index 0000000000..b9d0538941
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_name_filter.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+ path = "tmp/seatunnel/read/filter"
+ file_format_type = "json"
+ plugin_output = "sftp"
+ file_filter_pattern=".*.json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 15
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 15
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf
new file mode 100644
index 0000000000..373fe930bd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_to_access_for_json_path_filter.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+ path = "tmp/seatunnel/read/filter"
+ file_format_type = "json"
+ plugin_output = "sftp"
+ file_filter_pattern="tmp/seatunnel/read/filter/json202[^/]*/.*.json"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [{
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN_LENGTH
+ rule_value = 5
+ },
+ {
+ rule_type = MAX_LENGTH
+ rule_value = 5
+ }
+ ]
+ }]
+ }
+ }
+}
\ No newline at end of file