This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 07a2125e53 [Feature][Connector-V2] Support sync_mode=update for
FtpFile/SftpFile/LocalFile source (binary) (#10437)
07a2125e53 is described below
commit 07a2125e53e0d7349cb25f67ef0e79db50fe6616
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Feb 4 21:24:12 2026 +0800
[Feature][Connector-V2] Support sync_mode=update for
FtpFile/SftpFile/LocalFile source (binary) (#10437)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connectors/source/FtpFile.md | 86 +++++++++
docs/en/connectors/source/LocalFile.md | 76 ++++++++
docs/en/connectors/source/SftpFile.md | 87 +++++++++
docs/zh/connectors/source/FtpFile.md | 88 ++++++++-
docs/zh/connectors/source/LocalFile.md | 76 ++++++++
docs/zh/connectors/source/SftpFile.md | 87 +++++++++
.../file/source/reader/AbstractReadStrategy.java | 200 ++++++++++++++-----
.../file/source/reader/BinaryReadStrategy.java | 37 ++--
.../file/reader/BinaryReadStrategyTest.java | 35 ++++
.../source/reader/AbstractReadStrategyTest.java | 24 +++
.../file/ftp/source/FtpFileSourceFactory.java | 10 +
.../seatunnel/file/ftp/FtpFileFactoryTest.java | 28 ++-
.../file/local/source/LocalFileSourceFactory.java | 10 +
.../seatunnel/file/local/LocalFileFactoryTest.java | 28 ++-
.../file/sftp/source/SftpFileSourceFactory.java | 10 +
.../seatunnel/file/sftp/SftpFileFactoryTest.java | 28 ++-
.../e2e/connector/file/ftp/FtpFileIT.java | 211 +++++++++++++++++++--
.../resources/text/ftp_binary_update_distcp.conf | 52 +++++
.../e2e/connector/file/local/LocalFileIT.java | 97 ++++++++++
.../binary/local_file_binary_update_distcp.conf | 42 ++++
.../local_file_binary_update_strict_checksum.conf | 42 ++++
.../e2e/connector/file/fstp/SftpFileIT.java | 72 +++++++
.../resources/text/sftp_binary_update_distcp.conf | 59 ++++++
23 files changed, 1402 insertions(+), 83 deletions(-)
diff --git a/docs/en/connectors/source/FtpFile.md
b/docs/en/connectors/source/FtpFile.md
index 6b612cfa4c..48cccd96a7 100644
--- a/docs/en/connectors/source/FtpFile.md
+++ b/docs/en/connectors/source/FtpFile.md
@@ -76,6 +76,11 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| null_format | string | no | -
|
| binary_chunk_size | int | no | 1024
|
| binary_complete_file_mode | boolean | no | false
|
+| sync_mode | string | no | full
|
+| target_path | string | no | -
|
+| target_hadoop_conf | map | no | -
|
+| update_strategy | string | no | distcp
|
+| compare_mode | string | no | len_mtime
|
| common-options | | no | -
|
| file_filter_modified_start | string | no | -
|
| file_filter_modified_end | string | no | -
|
@@ -434,6 +439,46 @@ Only used when file_format_type is binary.
Whether to read the complete file as a single chunk instead of splitting into
chunks. When enabled, the entire file content will be read into memory at once.
Default is false.
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each
source file.
+- For remote file systems (FTP/SFTP), this adds per-file network overhead. It
is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If
checksum is unavailable, SeaTunnel falls back to content comparison (more
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default),
`checksum` (only valid when `update_strategy=strict`).
+
### file_filter_modified_start [string]
File modification time filter. The connector will filter some files base on
the last modification start time (include start time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
@@ -570,6 +615,47 @@ sink {
```
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same
filesystem and same relative paths).
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/seatunnel/read/binary2/"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+sink {
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+
+ path = "/seatunnel/read/binary2/"
+ tmp_path = "/seatunnel/read/binary2-tmp/"
+ file_format_type = "binary"
+ }
+}
+```
+
### Filter File
```hocon
diff --git a/docs/en/connectors/source/LocalFile.md
b/docs/en/connectors/source/LocalFile.md
index 72c01544c3..3af8d35067 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -76,6 +76,11 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| null_format | string | no | -
|
| binary_chunk_size | int | no | 1024
|
| binary_complete_file_mode | boolean | no | false
|
+| sync_mode | string | no | full
|
+| target_path | string | no | -
|
+| target_hadoop_conf | map | no | -
|
+| update_strategy | string | no | distcp
|
+| compare_mode | string | no | len_mtime
|
| common-options | | no | -
|
| tables_configs | list | no | used to define a multiple
table task |
| file_filter_modified_start | string | no | -
|
@@ -410,6 +415,46 @@ Only used when file_format_type is binary.
Whether to read the complete file as a single chunk instead of splitting into
chunks. When enabled, the entire file content will be read into memory at once.
Default is false.
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each
source file.
+- It is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If
checksum is unavailable, SeaTunnel falls back to content comparison (more
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default),
`checksum` (only valid when `update_strategy=strict`).
+
### file_filter_modified_start [string]
File modification time filter. The connector will filter some files base on
the last modification start time (include start time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
@@ -575,6 +620,37 @@ sink {
```
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same
filesystem and same relative paths).
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/seatunnel/read/binary2/"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+sink {
+ LocalFile {
+ path = "/seatunnel/read/binary2/"
+ tmp_path = "/seatunnel/read/binary2-tmp/"
+ file_format_type = "binary"
+ }
+}
+```
+
### Filter File
```hocon
diff --git a/docs/en/connectors/source/SftpFile.md
b/docs/en/connectors/source/SftpFile.md
index cc7ae6c94e..9b8b173188 100644
--- a/docs/en/connectors/source/SftpFile.md
+++ b/docs/en/connectors/source/SftpFile.md
@@ -107,6 +107,11 @@ The File does not have a specific type list, and we can
indicate which SeaTunnel
| null_format | string | no | -
| Only used when file_format_type is text. null_format to define which
strings can be represented as null. e.g: `\N`
|
| binary_chunk_size | int | no | 1024
| Only used when file_format_type is binary. The chunk size (in bytes) for
reading binary files. Default is 1024 bytes. Larger values may improve
performance for large files but use more memory.
|
| binary_complete_file_mode | boolean | no | false
| Only used when file_format_type is binary. Whether to read the complete
file as a single chunk instead of splitting into chunks. When enabled, the
entire file content will be read into memory at once. Default is false.
|
+| sync_mode | string | no | full
| File sync mode. Supported values: `full`, `update`. When `update`, the
source compares files between source/target and only reads new/changed files
(currently only supports `file_format_type=binary`).
|
+| target_path | string | no | -
| Only used when `sync_mode=update`. Target base path used for comparison
(it should usually be the same as sink `path`).
|
+| target_hadoop_conf | map | no | -
| Only used when `sync_mode=update`. Extra Hadoop configuration for target
filesystem. You can set `fs.defaultFS` in this map to override target
defaultFS.
|
+| update_strategy | string | no | distcp
| Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
|
+| compare_mode | string | no | len_mtime
| Only used when `sync_mode=update`. Supported values: `len_mtime`
(default), `checksum` (only valid when `update_strategy=strict`).
|
| common-options | | No | -
| Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
|
| file_filter_modified_start | string | no | -
| File modification time filter. The connector will filter some files base
on the last modification start time (include start time). The default data
format is `yyyy-MM-dd HH:mm:ss`.
|
| file_filter_modified_end | string | no | -
| File modification time filter. The connector will filter some files base
on the last modification end time (not include end time). The default data
format is `yyyy-MM-dd HH:mm:ss`.
|
@@ -292,6 +297,46 @@ Only used when file_format_type is binary.
Whether to read the complete file as a single chunk instead of splitting into
chunks. When enabled, the entire file content will be read into memory at once.
Default is false.
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each
source file.
+- For remote file systems (FTP/SFTP), this adds per-file network overhead. It
is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If
checksum is unavailable, SeaTunnel falls back to content comparison (more
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default),
`checksum` (only valid when `update_strategy=strict`).
+
### quote_char [string]
A single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
@@ -439,6 +484,48 @@ sink {
}
}
```
+
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same
filesystem and same relative paths).
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "tmp/seatunnel/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+
+sink {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/dst"
+ tmp_path = "tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+```
## Changelog
<ChangeLog />
diff --git a/docs/zh/connectors/source/FtpFile.md
b/docs/zh/connectors/source/FtpFile.md
index 9c10e6d730..1dd2e61ce6 100644
--- a/docs/zh/connectors/source/FtpFile.md
+++ b/docs/zh/connectors/source/FtpFile.md
@@ -72,6 +72,11 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
| null_format | string | 否 | - |
| binary_chunk_size | int | 否 | 1024 |
| binary_complete_file_mode | boolean | 否 | false |
+| sync_mode | string | 否 | full |
+| target_path | string | 否 | - |
+| target_hadoop_conf | map | 否 | - |
+| update_strategy | string | 否 | distcp |
+| compare_mode | string | 否 | len_mtime |
| common-options | | 否 | - |
| file_filter_modified_start | string | 否 | - |
| file_filter_modified_end | string | 否 | - |
@@ -404,6 +409,46 @@ SeaTunnel 将从源文件中跳过前 2 行。
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 对于远程文件系统(FTP/SFTP),会带来按文件的网络开销,不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在
`update_strategy=strict` 时可用)。
+
### file_filter_modified_start
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。
@@ -540,6 +585,47 @@ sink {
```
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/seatunnel/read/binary2/"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+sink {
+ FtpFile {
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+
+ path = "/seatunnel/read/binary2/"
+ tmp_path = "/seatunnel/read/binary2-tmp/"
+ file_format_type = "binary"
+ }
+}
+```
+
### 过滤文件
```hocon
@@ -569,4 +655,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/LocalFile.md
b/docs/zh/connectors/source/LocalFile.md
index ef6dabed25..22dd9391a4 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -76,6 +76,11 @@ import ChangeLog from '../changelog/connector-file-local.md';
| null_format | string | 否 | - |
| binary_chunk_size | int | 否 | 1024 |
| binary_complete_file_mode | boolean | 否 | false |
+| sync_mode | string | 否 | full |
+| target_path | string | 否 | - |
+| target_hadoop_conf | map | 否 | - |
+| update_strategy | string | 否 | distcp |
+| compare_mode | string | 否 | len_mtime |
| common-options | | 否 | - |
| tables_configs | list | 否 | 用于定义多表任务 |
| file_filter_modified_start | string | 否 | - |
@@ -411,6 +416,46 @@ null_format 定义哪些字符串可以表示为 null。
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在
`update_strategy=strict` 时可用)。
+
### file_filter_modified_start
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。
@@ -576,6 +621,37 @@ sink {
```
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/binary/"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/seatunnel/read/binary2/"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+sink {
+ LocalFile {
+ path = "/seatunnel/read/binary2/"
+ tmp_path = "/seatunnel/read/binary2-tmp/"
+ file_format_type = "binary"
+ }
+}
+```
+
### 过滤文件
```hocon
diff --git a/docs/zh/connectors/source/SftpFile.md
b/docs/zh/connectors/source/SftpFile.md
index 42dcb346fa..7de83e3af9 100644
--- a/docs/zh/connectors/source/SftpFile.md
+++ b/docs/zh/connectors/source/SftpFile.md
@@ -107,6 +107,11 @@ import ChangeLog from
'../changelog/connector-file-sftp.md';
| null_format | string | 否 | - |
仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N`
|
| binary_chunk_size | int | 否 | 1024 |
仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
|
| binary_complete_file_mode | boolean | 否 | false |
仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
|
+| sync_mode | string | 否 | full |
文件同步模式,支持:`full`(默认)、`update`。当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持
`file_format_type=binary`)。
|
+| target_path | string | 否 | - | 仅在
`sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
|
+| target_hadoop_conf | map | 否 | - | 仅在
`sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 defaultFS。
|
+| update_strategy | string | 否 | distcp | 仅在
`sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
|
+| compare_mode | string | 否 | len_mtime | 仅在
`sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在
`update_strategy=strict` 时可用)。
|
| common-options | | 否 | - |
数据源插件通用参数,请参考[数据源通用选项](../common-options/source-common-options.md)了解详情。
|
| file_filter_modified_start | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
| file_filter_modified_end | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
@@ -291,6 +296,46 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 对于远程文件系统(FTP/SFTP),会带来按文件的网络开销,不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在
`update_strategy=strict` 时可用)。
+
### schema [config]
#### fields [Config]
@@ -430,6 +475,48 @@ sink {
}
}
```
+
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "tmp/seatunnel/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+
+sink {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/dst"
+ tmp_path = "tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+```
## 变更日志
<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 80b88cd170..79bc6badf3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -124,6 +124,11 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected transient boolean shareTargetFileSystemProxy;
protected transient boolean checksumUnavailableWarned;
+ private static final class UpdateModeStats {
+ private long scanned;
+ private long skipped;
+ }
+
@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
@@ -147,46 +152,74 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
@Override
public List<String> getFileNamesByPath(String path) throws IOException {
ArrayList<String> fileNames = new ArrayList<>();
+ UpdateModeStats updateModeStats = enableUpdateSync ? new
UpdateModeStats() : null;
+ collectFileNamesByPath(path, fileNames, updateModeStats);
+ if (updateModeStats != null) {
+ log.info(
+ "Update sync mode statistics: scanned={}, skipped={},
to_sync={}",
+ updateModeStats.scanned,
+ updateModeStats.skipped,
+ updateModeStats.scanned - updateModeStats.skipped);
+ }
+ return fileNames;
+ }
+
+ private void collectFileNamesByPath(
+ String path, List<String> fileNames, UpdateModeStats
updateModeStats)
+ throws IOException {
FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
for (FileStatus fileStatus : stats) {
if (fileStatus.isDirectory()) {
// skip hidden tmp directory, such as .hive-staging_hive
if (!fileStatus.getPath().getName().startsWith(".")) {
-
fileNames.addAll(getFileNamesByPath(fileStatus.getPath().toString()));
+ collectFileNamesByPath(
+ fileStatus.getPath().toString(), fileNames,
updateModeStats);
}
continue;
}
- if (fileStatus.isFile() && filterFileByPattern(fileStatus) &&
fileStatus.getLen() > 0) {
- // filter '_SUCCESS' file
- if (!fileStatus.getPath().getName().equals("_SUCCESS")
- && !fileStatus.getPath().getName().startsWith(".")
- && filterFileByModificationDate(fileStatus)) {
-
- String filePath = fileStatus.getPath().toString();
- if (!readPartitions.isEmpty()) {
- for (String readPartition : readPartitions) {
- if (filePath.contains(readPartition)) {
- if (shouldSyncFileInUpdateMode(fileStatus)) {
- fileNames.add(filePath);
- this.fileNames.add(filePath);
- }
- break;
- }
- }
- } else {
- if (shouldSyncFileInUpdateMode(fileStatus)) {
- fileNames.add(filePath);
- this.fileNames.add(filePath);
- }
+ if (!fileStatus.isFile()
+ || !filterFileByPattern(fileStatus)
+ || fileStatus.getLen() <= 0) {
+ continue;
+ }
+
+ // filter '_SUCCESS' file and hidden files
+ String fileName = fileStatus.getPath().getName();
+ if (fileName.equals("_SUCCESS")
+ || fileName.startsWith(".")
+ || !filterFileByModificationDate(fileStatus)) {
+ continue;
+ }
+
+ String filePath = fileStatus.getPath().toString();
+ if (StringUtils.isNotEmpty(filenameExtension)
+ && !filePath.endsWith(filenameExtension)) {
+ continue;
+ }
+
+ if (!readPartitions.isEmpty()) {
+ boolean partitionMatched = false;
+ for (String readPartition : readPartitions) {
+ if (filePath.contains(readPartition)) {
+ partitionMatched = true;
+ break;
}
}
+ if (!partitionMatched) {
+ continue;
+ }
+ }
+
+ if (updateModeStats != null) {
+ updateModeStats.scanned++;
+ }
+ if (shouldSyncFileInUpdateMode(fileStatus)) {
+ fileNames.add(filePath);
+ this.fileNames.add(filePath);
+ } else if (updateModeStats != null) {
+ updateModeStats.skipped++;
}
}
- if (StringUtils.isNotEmpty(filenameExtension)) {
- this.fileNames.removeIf(fileName ->
!fileName.endsWith(filenameExtension));
- fileNames.removeIf(fileName ->
!fileName.endsWith(filenameExtension));
- }
- return fileNames;
}
private Date getFileModifiedDate(String modifiedDate) {
@@ -304,6 +337,12 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
enableUpdateSync = syncMode == FileSyncMode.UPDATE;
if (enableUpdateSync) {
validateUpdateSyncConfig(pluginConfig);
+ log.info(
+ "Update sync mode enabled: source_path={}, target_path={},
update_strategy={}, compare_mode={}",
+ maskUriUserInfo(sourceRootPath),
+ maskUriUserInfo(targetPath),
+ updateStrategy.name().toLowerCase(Locale.ROOT),
+ compareMode.name().toLowerCase(Locale.ROOT));
}
}
@@ -711,37 +750,73 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
long targetMtime = targetFileStatus.getModificationTime();
if (updateStrategy == FileUpdateStrategy.DISTCP) {
- return sourceMtime > targetMtime;
+ if (sourceMtime > targetMtime) {
+ return true;
+ }
+ logUpdateModeSkip(sourceFilePath, targetFilePath, "distcp: target
newer or same");
+ return false;
}
if (updateStrategy == FileUpdateStrategy.STRICT) {
if (compareMode == FileCompareMode.LEN_MTIME) {
- return sourceMtime != targetMtime;
+ if (sourceMtime != targetMtime) {
+ return true;
+ }
+ logUpdateModeSkip(
+ sourceFilePath, targetFilePath, "strict len_mtime: len
and mtime equal");
+ return false;
}
if (compareMode == FileCompareMode.CHECKSUM) {
- FileChecksum sourceChecksum =
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
- FileChecksum targetChecksum =
-
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
- if (sourceChecksum == null || targetChecksum == null) {
+ FileChecksum sourceChecksum = null;
+ FileChecksum targetChecksum = null;
+ Exception checksumException = null;
+ try {
+ sourceChecksum =
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
+ targetChecksum =
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
+ } catch (Exception e) {
+ checksumException = e;
+ }
+
+ if (checksumException != null || sourceChecksum == null ||
targetChecksum == null) {
if (!checksumUnavailableWarned) {
- log.warn(
- "File checksum is not available, fallback to
content comparison. source={}, target={}",
- sourceFilePath,
- targetFilePath);
+ if (checksumException == null) {
+ log.warn(
+ "File checksum is not available, fallback
to content comparison. source={}, target={}",
+ maskUriUserInfo(sourceFilePath),
+ maskUriUserInfo(targetFilePath));
+ } else {
+ log.warn(
+ "File checksum is not available, fallback
to content comparison. source={}, target={}",
+ maskUriUserInfo(sourceFilePath),
+ maskUriUserInfo(targetFilePath),
+ checksumException);
+ }
checksumUnavailableWarned = true;
}
try {
- return !fileContentEquals(sourceFilePath,
targetFilePath);
+ boolean sameContent =
fileContentEquals(sourceFilePath, targetFilePath);
+ if (sameContent) {
+ logUpdateModeSkip(
+ sourceFilePath,
+ targetFilePath,
+ "strict checksum: content equal (checksum
unavailable)");
+ }
+ return !sameContent;
} catch (Exception e) {
log.warn(
"Fallback content comparison failed, fallback
to COPY. source={}, target={}",
- sourceFilePath,
- targetFilePath,
+ maskUriUserInfo(sourceFilePath),
+ maskUriUserInfo(targetFilePath),
e);
return true;
}
}
- return !checksumEquals(sourceChecksum, targetChecksum);
+ if (checksumEquals(sourceChecksum, targetChecksum)) {
+ logUpdateModeSkip(
+ sourceFilePath, targetFilePath, "strict checksum:
checksum equal");
+ return false;
+ }
+ return true;
}
}
@@ -790,7 +865,17 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
return new Path(targetBasePath, cleanRelativePath).toString();
}
- private static String resolveRelativePath(String basePath, String
fullFilePath) {
+ /**
+ * Resolve relative path from {@code basePath} to {@code fullFilePath}.
+ *
+ * <p><b>NOTE:</b> This method is intended for internal use by specific
read strategies (for
+ * example {@link BinaryReadStrategy}) that need custom path resolution
logic.
+ *
+ * @param basePath base directory path
+ * @param fullFilePath full file path
+ * @return relative path from base to file
+ */
+ protected static String resolveRelativePath(String basePath, String
fullFilePath) {
String base = normalizePathPart(basePath);
String file = normalizePathPart(fullFilePath);
if (StringUtils.isBlank(file)) {
@@ -824,6 +909,35 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
}
+ private static String maskUriUserInfo(String rawPath) {
+ if (StringUtils.isBlank(rawPath)) {
+ return rawPath;
+ }
+ try {
+ java.net.URI uri = new Path(rawPath).toUri();
+ if (uri.getUserInfo() == null || uri.getAuthority() == null) {
+ return rawPath;
+ }
+ String maskedAuthority =
uri.getAuthority().replace(uri.getUserInfo() + "@", "***@");
+ return uri.getScheme()
+ + "://"
+ + maskedAuthority
+ + (uri.getPath() == null ? "" : uri.getPath());
+ } catch (Exception e) {
+ return rawPath;
+ }
+ }
+
+ private void logUpdateModeSkip(String sourceFilePath, String
targetFilePath, String reason) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Update sync mode skipped file: source={}, target={},
reason={}",
+ maskUriUserInfo(sourceFilePath),
+ maskUriUserInfo(targetFilePath),
+ reason);
+ }
+ }
+
private static <E extends Enum<E>> E parseEnumValue(
Class<E> enumClass, String rawValue, String optionKey) {
if (StringUtils.isBlank(rawValue)) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
index af6431646a..a5867d562d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MetadataUtil;
@@ -29,8 +30,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
@@ -45,7 +46,8 @@ public class BinaryReadStrategy extends AbstractReadStrategy {
PrimitiveByteArrayType.INSTANCE,
BasicType.STRING_TYPE, BasicType.LONG_TYPE
});
- private File basePath;
+ private String basePath;
+ private transient boolean basePathIsFile;
private int binaryChunkSize =
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
private boolean completeFileMode =
FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
@@ -53,7 +55,16 @@ public class BinaryReadStrategy extends AbstractReadStrategy
{
@Override
public void init(HadoopConf conf) {
super.init(conf);
- basePath = new
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+ basePath =
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
+ try {
+ basePathIsFile = hadoopFileSystemProxy.isFile(basePath);
+ } catch (IOException e) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "Failed to determine whether file source path is a file or
directory: "
+ + basePath,
+ e);
+ }
// Load binary chunk size configuration
if
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
@@ -80,18 +91,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy
{
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws IOException, FileConnectorException {
try (InputStream inputStream =
hadoopFileSystemProxy.getInputStream(path)) {
- String relativePath;
- if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) {
- relativePath = basePath.getName();
- } else {
- relativePath =
- path.substring(
- path.indexOf(basePath.getAbsolutePath())
- + basePath.getAbsolutePath().length());
- if (relativePath.startsWith(File.separator)) {
- relativePath =
relativePath.substring(File.separator.length());
- }
- }
+ String relativePath = resolveBinaryRelativePath(path);
if (completeFileMode) {
// Read entire file as a single chunk
@@ -109,6 +109,13 @@ public class BinaryReadStrategy extends
AbstractReadStrategy {
}
}
+ private String resolveBinaryRelativePath(String filePath) {
+ if (basePathIsFile) {
+ return new Path(filePath).getName();
+ }
+ return resolveRelativePath(basePath, filePath);
+ }
+
/** Read the entire file as a single chunk. */
private void readCompleteFile(
InputStream inputStream,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
index fd969c2095..259e997d7b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
@@ -139,8 +139,43 @@ public class BinaryReadStrategyTest {
Assertions.assertEquals(0L, row.getField(2));
}
+ @Test
+ public void testBinaryRelativePath_WhenBaseIsFile() throws IOException {
+ File testFile = createTestFile("test_binary_base_is_file.bin", 10);
+
+ Config config = createConfig(testFile.getAbsolutePath(), null, null);
+ binaryReadStrategy.setPluginConfig(config);
+ binaryReadStrategy.init(localConf);
+
+ TestCollector collector = new TestCollector();
+ binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table",
collector);
+
+ List<SeaTunnelRow> rows = collector.getRows();
+ Assertions.assertFalse(rows.isEmpty());
+ Assertions.assertEquals("test_binary_base_is_file.bin",
rows.get(0).getField(1));
+ }
+
+ @Test
+ public void testBinaryRelativePath_WhenBaseIsDirectoryWithSubDir() throws
IOException {
+ File testFile = createTestFile("subdir/test_binary_in_sub.bin", 10);
+
+ Config config = createConfig(tempDir.toFile().getAbsolutePath(), null,
null);
+ binaryReadStrategy.setPluginConfig(config);
+ binaryReadStrategy.init(localConf);
+
+ TestCollector collector = new TestCollector();
+ binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table",
collector);
+
+ List<SeaTunnelRow> rows = collector.getRows();
+ Assertions.assertFalse(rows.isEmpty());
+ Assertions.assertEquals("subdir/test_binary_in_sub.bin",
rows.get(0).getField(1));
+ }
+
private File createTestFile(String fileName, int sizeInBytes) throws
IOException {
File testFile = tempDir.resolve(fileName).toFile();
+ if (testFile.getParentFile() != null) {
+ testFile.getParentFile().mkdirs();
+ }
if (sizeInBytes > 0) {
try (FileOutputStream fos = new FileOutputStream(testFile)) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index c330d8ba36..74b4f7abe9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -381,6 +381,30 @@ public class AbstractReadStrategyTest {
}
}
+ @Test
+ void testResolveRelativePathWithSftpUri() {
+ String basePath = "sftp://server:22/path";
+ String fullFilePath = "sftp://server:22/path/sub/file.txt";
+ Assertions.assertEquals(
+ "sub/file.txt",
AbstractReadStrategy.resolveRelativePath(basePath, fullFilePath));
+ }
+
+ @Test
+ void testResolveRelativePathWithFtpUri() {
+ String basePath = "ftp://server:21/tmp/seatunnel/read";
+ String fullFilePath = "ftp://server:21/tmp/seatunnel/read/file.txt";
+ Assertions.assertEquals(
+ "file.txt", AbstractReadStrategy.resolveRelativePath(basePath,
fullFilePath));
+ }
+
+ @Test
+ void testResolveRelativePathWithCustomSchemeUri() {
+ String basePath =
"default.default_sftp://sftp:22/tmp/seatunnel/update/src";
+ String fullFilePath =
"default.default_sftp://sftp:22/tmp/seatunnel/update/src/test.bin_0";
+ Assertions.assertEquals(
+ "test.bin_0",
AbstractReadStrategy.resolveRelativePath(basePath, fullFilePath));
+ }
+
private static Map<String, Object> buildBasePluginConfigWithPartitions() {
Map<String, Object> config = new HashMap<>();
config.put(FileBaseSourceOptions.FILE_PATH.key(),
"/tmp/dt=2024-01-01");
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 02df6d2989..765fb318a7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpFileSourceOptions;
@@ -101,6 +102,15 @@ public class FtpFileSourceFactory implements
TableSourceFactory {
.optional(FtpFileSourceOptions.FTP_CONTROL_ENCODING)
.optional(FileBaseSourceOptions.QUOTE_CHAR)
.optional(FileBaseSourceOptions.ESCAPE_CHAR)
+ .optional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileBaseSourceOptions.TARGET_HADOOP_CONF,
+ FileBaseSourceOptions.UPDATE_STRATEGY,
+ FileBaseSourceOptions.COMPARE_MODE)
+ .conditional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileSyncMode.UPDATE,
+ FileBaseSourceOptions.TARGET_PATH)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
index 529d1be814..dcf098e05f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import
org.apache.seatunnel.connectors.seatunnel.file.ftp.sink.FtpFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.ftp.source.FtpFileSourceFactory;
@@ -27,7 +32,28 @@ class FtpFileFactoryTest {
@Test
void optionRule() {
- Assertions.assertNotNull((new FtpFileSourceFactory()).optionRule());
+ OptionRule optionRule = (new FtpFileSourceFactory()).optionRule();
+ Assertions.assertNotNull(optionRule);
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+ Expression expectExpression =
+ Expression.of(FileBaseSourceOptions.SYNC_MODE,
FileSyncMode.UPDATE);
+ Assertions.assertTrue(
+ optionRule.getRequiredOptions().stream()
+
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+ .filter(
+ required ->
+ required.getOptions()
+
.contains(FileBaseSourceOptions.TARGET_PATH))
+ .anyMatch(required ->
expectExpression.equals(required.getExpression())));
Assertions.assertNotNull((new FtpFileSinkFactory()).optionRule());
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 987558810e..a035b19f1a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileSourceOptions;
@@ -108,6 +109,15 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
.optional(FileBaseSourceOptions.READ_COLUMNS)
.optional(FileBaseSourceOptions.QUOTE_CHAR)
.optional(FileBaseSourceOptions.ESCAPE_CHAR)
+ .optional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileBaseSourceOptions.TARGET_HADOOP_CONF,
+ FileBaseSourceOptions.UPDATE_STRATEGY,
+ FileBaseSourceOptions.COMPARE_MODE)
+ .conditional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileSyncMode.UPDATE,
+ FileBaseSourceOptions.TARGET_PATH)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
index 693f81578a..859d9d3148 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.local;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import
org.apache.seatunnel.connectors.seatunnel.file.local.sink.LocalFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.local.source.LocalFileSourceFactory;
@@ -28,6 +33,27 @@ class LocalFileFactoryTest {
@Test
void optionRule() {
Assertions.assertNotNull((new LocalFileSinkFactory()).optionRule());
- Assertions.assertNotNull((new LocalFileSourceFactory()).optionRule());
+ OptionRule optionRule = (new LocalFileSourceFactory()).optionRule();
+ Assertions.assertNotNull(optionRule);
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+ Expression expectExpression =
+ Expression.of(FileBaseSourceOptions.SYNC_MODE,
FileSyncMode.UPDATE);
+ Assertions.assertTrue(
+ optionRule.getRequiredOptions().stream()
+
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+ .filter(
+ required ->
+ required.getOptions()
+
.contains(FileBaseSourceOptions.TARGET_PATH))
+ .anyMatch(required ->
expectExpression.equals(required.getExpression())));
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index 6ed53d3bed..eb363eb22c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpFileSourceOptions;
@@ -90,6 +91,15 @@ public class SftpFileSourceFactory implements
TableSourceFactory {
.optional(FileBaseSourceOptions.READ_COLUMNS)
.optional(FileBaseSourceOptions.QUOTE_CHAR)
.optional(FileBaseSourceOptions.ESCAPE_CHAR)
+ .optional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileBaseSourceOptions.TARGET_HADOOP_CONF,
+ FileBaseSourceOptions.UPDATE_STRATEGY,
+ FileBaseSourceOptions.COMPARE_MODE)
+ .conditional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileSyncMode.UPDATE,
+ FileBaseSourceOptions.TARGET_PATH)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
index d1964d3e98..746abaa0d6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
@@ -17,6 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.sftp;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import
org.apache.seatunnel.connectors.seatunnel.file.sftp.sink.SftpFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.sftp.source.SftpFileSourceFactory;
@@ -27,7 +32,28 @@ class SftpFileFactoryTest {
@Test
void optionRule() {
- Assertions.assertNotNull((new SftpFileSourceFactory()).optionRule());
+ OptionRule optionRule = (new SftpFileSourceFactory()).optionRule();
+ Assertions.assertNotNull(optionRule);
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+ Assertions.assertTrue(
+
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+ Expression expectExpression =
+ Expression.of(FileBaseSourceOptions.SYNC_MODE,
FileSyncMode.UPDATE);
+ Assertions.assertTrue(
+ optionRule.getRequiredOptions().stream()
+
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+ .filter(
+ required ->
+ required.getOptions()
+
.contains(FileBaseSourceOptions.TARGET_PATH))
+ .anyMatch(required ->
expectExpression.equals(required.getExpression())));
Assertions.assertNotNull((new SftpFileSinkFactory()).optionRule());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 575fe7f9ad..2e84ff4bf4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -75,6 +75,8 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
private GenericContainer<?> ftpContainer;
+ private String ftpHomeDir;
+
private String ftpPassiveAddress;
private BiFunction<Integer, Integer, Integer[]> generateExposedPorts =
@@ -134,36 +136,39 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
log.info("ftp container started");
+ ftpHomeDir = getFtpUserHomeDir();
+
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ ftpHomeDir +
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ ftpHomeDir +
"/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/text/e2e-txt.zip",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/zip/txt/single/e2e-txt.zip",
+ ftpHomeDir + "/tmp/seatunnel/read/zip/txt/single/e2e-txt.zip",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ ftpHomeDir +
"/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
- "/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx",
ftpContainer);
+ "/excel/e2e.xlsx", ftpHomeDir + "/e2e.xlsx", ftpContainer);
- ftpContainer.execInContainer("sh", "-c", "chmod -R 777
/home/vsftpd/seatunnel/");
- ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
+ ftpContainer.execInContainer("sh", "-c", "chmod -R 777 " + ftpHomeDir
+ "/");
+ ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp " +
ftpHomeDir + "/");
}
@TestTemplate
@@ -175,7 +180,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
container, "/text/ftp_file_text_to_assert_for_passive.conf",
configParams);
assertJobExecution(container,
"/text/fake_to_ftp_file_text_for_passive.conf", configParams);
- String homePath = "/home/vsftpd/seatunnel/tmp/seatunnel/passive_text";
+ String homePath = ftpHomeDir + "/tmp/seatunnel/passive_text";
// test write ftp text file
Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
@@ -194,7 +199,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult =
container.executeJob("/text/ftp_to_ftp_for_binary.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel";
+ String homePath = ftpHomeDir + "/uploads/seatunnel";
Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
// Confirm data is written correctly
@@ -206,34 +211,65 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
deleteFileFromContainer(homePath);
}
+ @TestTemplate
+ public void testFtpBinaryUpdateModeDistcp(TestContainer container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putFtpFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+ Container.ExecResult firstRun =
container.executeJob("/text/ftp_binary_update_distcp.conf");
+ Assertions.assertEquals(0, firstRun.getExitCode(),
firstRun.getStderr());
+ Assertions.assertEquals("abc",
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ // Make target newer with same length, distcp strategy should SKIP
overwrite.
+ putFtpFile("/tmp/seatunnel/update/dst/test.bin", "zzz");
+ Container.ExecResult secondRun =
+ container.executeJob("/text/ftp_binary_update_distcp.conf");
+ Assertions.assertEquals(0, secondRun.getExitCode(),
secondRun.getStderr());
+ Assertions.assertEquals("zzz",
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ // Change source length, distcp strategy should COPY overwrite.
+ putFtpFile("/tmp/seatunnel/update/src/test.bin", "abcd");
+ Container.ExecResult thirdRun =
container.executeJob("/text/ftp_binary_update_distcp.conf");
+ Assertions.assertEquals(0, thirdRun.getExitCode(),
thirdRun.getStderr());
+ Assertions.assertEquals("abcd",
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ deleteFileFromContainer(ftpHomeDir + "/tmp/seatunnel/update");
+ }
+
@TestTemplate
public void testFtpToAssertForJsonFilter(TestContainer container)
throws IOException, InterruptedException {
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/json/e2e.json",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
ftpContainer);
ContainerUtil.copyFileIntoContainers(
"/text/e2e.txt",
-
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ ftpHomeDir
+ +
"/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
ftpContainer);
- ftpContainer.execInContainer("sh", "-c", "chmod -R 777
/home/vsftpd/seatunnel/");
- ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
+ ftpContainer.execInContainer("sh", "-c", "chmod -R 777 " + ftpHomeDir
+ "/");
+ ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp " +
ftpHomeDir + "/");
TestHelper helper = new TestHelper(container);
// -----filter based on the file directory at the same time, the
expression needs to start
@@ -244,7 +280,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/json/ftp_to_access_for_json_name_filter.conf");
// delete path
- String filterPath = "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter";
+ String filterPath = ftpHomeDir + "/tmp/seatunnel/read/filter";
deleteFileFromContainer(filterPath);
}
@@ -280,6 +316,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
// test write ftp json file
helper.execute("/json/fake_to_ftp_file_json.conf");
// test read ftp json file
+ ensureReadJsonInputFile();
helper.execute("/json/ftp_file_json_to_assert.conf");
// test write ftp parquet file
helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
@@ -289,7 +326,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
// test ftp source support multipleTable
- String homePath = "/home/vsftpd/seatunnel";
+ String homePath = ftpHomeDir;
String sink01 = "/tmp/seatunnel/json/sink/multiplesource/fake01";
String sink02 = "/tmp/seatunnel/json/sink/multiplesource/fake02";
deleteFileFromContainer(homePath + sink01);
@@ -308,7 +345,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
String specialPath = "/tmp/seatunnel/test spaces";
String fileName = "file with spaces.txt";
String fullPath = specialPath + "/" + fileName;
- String homePath = "/home/vsftpd/seatunnel";
+ String homePath = ftpHomeDir;
String containerPath = homePath + fullPath;
try {
@@ -355,7 +392,7 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
// test mult table and save_mode:RECREATE_SCHEMA DROP_DATA
- String homePath = "/home/vsftpd/seatunnel";
+ String homePath = ftpHomeDir;
String path1 = "/tmp/seatunnel_mult/text/source_1";
String path2 = "/tmp/seatunnel_mult/text/source_2";
deleteFileFromContainer(homePath + path1);
@@ -383,6 +420,138 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 2);
}
+ private void resetUpdateTestPath() throws IOException,
InterruptedException {
+ deleteFileFromContainer(ftpHomeDir + "/tmp/seatunnel/update");
+ Container.ExecResult mkdirResult =
+ ftpContainer.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + ftpHomeDir
+ + "/tmp/seatunnel/update/src "
+ + ftpHomeDir
+ + "/tmp/seatunnel/update/dst "
+ + ftpHomeDir
+ + "/tmp/seatunnel/update/tmp");
+ Assertions.assertEquals(0, mkdirResult.getExitCode(),
mkdirResult.getStderr());
+ ftpContainer.execInContainer(
+ "sh", "-c", "chmod -R 777 " + ftpHomeDir +
"/tmp/seatunnel/update || true");
+ ftpContainer.execInContainer(
+ "sh", "-c", "chown -R ftp:ftp " + ftpHomeDir +
"/tmp/seatunnel/update || true");
+ }
+
+ private void putFtpFile(String ftpPath, String content)
+ throws IOException, InterruptedException {
+ String containerPath = ftpHomeDir + ftpPath;
+ String command =
+ "mkdir -p $(dirname '"
+ + containerPath
+ + "') && printf '"
+ + content
+ + "' > '"
+ + containerPath
+ + "' && chmod 666 '"
+ + containerPath
+ + "'";
+ Container.ExecResult putResult = ftpContainer.execInContainer("sh",
"-c", command);
+ Assertions.assertEquals(0, putResult.getExitCode(),
putResult.getStderr());
+ }
+
+ private String readFtpFile(String ftpPath) throws IOException,
InterruptedException {
+ String containerPath = ftpHomeDir + ftpPath;
+ Container.ExecResult catResult =
+ ftpContainer.execInContainer("sh", "-c", "cat '" +
containerPath + "'");
+ Assertions.assertEquals(0, catResult.getExitCode(),
catResult.getStderr());
+ return catResult.getStdout() == null ? "" :
catResult.getStdout().trim();
+ }
+
+ private String getFtpUserHomeDir() throws IOException,
InterruptedException {
+ // Prefer vsftpd local_root as the real filesystem root used by FTP
paths in test configs.
+ // In some images, FTP users are created as virtual users and may not
exist in /etc/passwd.
+ try {
+ Container.ExecResult confResult =
+ ftpContainer.execInContainer("sh", "-c", "cat
/etc/vsftpd/vsftpd.conf");
+ if (confResult.getExitCode() == 0 &&
StringUtils.isNotBlank(confResult.getStdout())) {
+ Properties properties = new Properties();
+ properties.load(new StringReader(confResult.getStdout()));
+ String localRoot = properties.getProperty("local_root");
+ if (StringUtils.isNotBlank(localRoot)) {
+ String resolved =
+ localRoot
+ .trim()
+ .replace("${FTP_USER}", USERNAME)
+ .replace("$FTP_USER", USERNAME)
+ .replace("${USER}", USERNAME)
+ .replace("$USER", USERNAME);
+ if (StringUtils.isNotBlank(resolved)) {
+ return resolved;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to resolve ftp local_root from vsftpd.conf,
fallback to default.", e);
+ }
+
+ // Fallback: resolve from /etc/passwd if user exists
+ Container.ExecResult homeResult =
+ ftpContainer.execInContainer(
+ "sh",
+ "-c",
+ "awk -F: '$1==\""
+ + USERNAME
+ + "\"{print $6}' /etc/passwd 2>/dev/null ||
true");
+ if (homeResult.getExitCode() == 0) {
+ String homeDir = homeResult.getStdout() == null ? "" :
homeResult.getStdout().trim();
+ if (StringUtils.isNotBlank(homeDir)) {
+ return homeDir;
+ }
+ }
+
+ // Last resort: use default directory used by fauria/vsftpd.
+ String defaultRoot = "/home/vsftpd";
+ if (containerDirExists(defaultRoot)) {
+ log.warn(
+ "Cannot resolve ftp home directory for user: {}, fallback
to {}",
+ USERNAME,
+ defaultRoot);
+ return defaultRoot;
+ }
+ String defaultUserRoot = defaultRoot + "/" + USERNAME;
+ log.warn(
+ "Cannot resolve ftp home directory for user: {}, fallback to
{}",
+ USERNAME,
+ defaultUserRoot);
+ return defaultUserRoot;
+ }
+
+ private boolean containerDirExists(String path) throws IOException,
InterruptedException {
+ Container.ExecResult result =
+ ftpContainer.execInContainer(
+ "sh", "-c", "test -d '" + path + "' && echo true ||
echo false");
+ return result.getExitCode() == 0
+ && StringUtils.equalsIgnoreCase(
+ (result.getStdout() == null ? "" :
result.getStdout().trim()), "true");
+ }
+
+ private void ensureReadJsonInputFile() throws IOException,
InterruptedException {
+ Container.ExecResult mkdirResult =
+ ftpContainer.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + ftpHomeDir
+ +
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding");
+ Assertions.assertEquals(0, mkdirResult.getExitCode(),
mkdirResult.getStderr());
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+ ftpHomeDir +
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ ftpContainer);
+ Container.ExecResult chmodResult =
+ ftpContainer.execInContainer(
+ "sh", "-c", "chmod -R 777 " + ftpHomeDir +
"/tmp/seatunnel/read");
+ Assertions.assertEquals(0, chmodResult.getExitCode(),
chmodResult.getStderr());
+ }
+
@SneakyThrows
private List<String> getFileListFromContainer(String path) {
String command = "ls -1 " + path;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
new file mode 100644
index 0000000000..7806f6a311
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+
+ path = "/tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/tmp/seatunnel/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+
+ path = "/tmp/seatunnel/update/dst"
+ tmp_path = "/tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index bc1fb26d93..c6a4c39ded 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -40,6 +40,7 @@ import
org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
@@ -422,6 +423,62 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/excel/local_excel_xlsx_gz_to_assert.conf");
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK, EngineType.SPARK},
+ disabledReason =
+ "sync_mode=update needs to compare source/target on the
same filesystem. Local filesystem is not shared between engine master/workers
in Flink/Spark E2E.")
+ public void testLocalFileBinaryUpdateModeDistcp(TestContainer container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+ TestHelper helper = new TestHelper(container);
+ helper.execute("/binary/local_file_binary_update_distcp.conf");
+ Assertions.assertEquals("abc",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ // Make target newer with same length, distcp strategy should SKIP
overwrite.
+ putLocalFile("/tmp/seatunnel/update/dst/test.bin", "zzz");
+ helper.execute("/binary/local_file_binary_update_distcp.conf");
+ Assertions.assertEquals("zzz",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ // Change source length, distcp strategy should COPY overwrite.
+ putLocalFile("/tmp/seatunnel/update/src/test.bin", "abcd");
+ helper.execute("/binary/local_file_binary_update_distcp.conf");
+ Assertions.assertEquals("abcd",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ baseContainer.execInContainer("sh", "-c", "rm -rf
/tmp/seatunnel/update");
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK, EngineType.SPARK},
+ disabledReason =
+ "sync_mode=update needs to compare source/target on the
same filesystem. Local filesystem is not shared between engine master/workers
in Flink/Spark E2E.")
+ public void testLocalFileBinaryUpdateModeStrictChecksum(TestContainer
container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+ TestHelper helper = new TestHelper(container);
+
helper.execute("/binary/local_file_binary_update_strict_checksum.conf");
+ Assertions.assertEquals("abc",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+ long firstMtimeSeconds =
getLocalFileMtimeSeconds("/tmp/seatunnel/update/dst/test.bin");
+ Thread.sleep(1100);
+
+
helper.execute("/binary/local_file_binary_update_strict_checksum.conf");
+ long secondMtimeSeconds =
getLocalFileMtimeSeconds("/tmp/seatunnel/update/dst/test.bin");
+ Assertions.assertEquals(
+ firstMtimeSeconds,
+ secondMtimeSeconds,
+ "Strict checksum should skip unchanged files and keep target
mtime");
+
+ baseContainer.execInContainer("sh", "-c", "rm -rf
/tmp/seatunnel/update");
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
@@ -487,6 +544,46 @@ public class LocalFileIT extends TestSuiteBase {
Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
}
+ private void resetUpdateTestPath() throws IOException,
InterruptedException {
+ Container.ExecResult result =
+ baseContainer.execInContainer(
+ "sh",
+ "-c",
+ "rm -rf /tmp/seatunnel/update && mkdir -p
/tmp/seatunnel/update/src /tmp/seatunnel/update/dst /tmp/seatunnel/update/tmp");
+ Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+ }
+
+ private void putLocalFile(String filePath, String content)
+ throws IOException, InterruptedException {
+ String command =
+ "mkdir -p $(dirname '"
+ + filePath
+ + "') && printf '"
+ + content
+ + "' > '"
+ + filePath
+ + "' && chmod 666 '"
+ + filePath
+ + "'";
+ Container.ExecResult result = baseContainer.execInContainer("sh",
"-c", command);
+ Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+ }
+
+ private String readLocalFile(String filePath) throws IOException,
InterruptedException {
+ Container.ExecResult result =
+ baseContainer.execInContainer("sh", "-c", "cat '" + filePath +
"'");
+ Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+ return result.getStdout() == null ? "" : result.getStdout().trim();
+ }
+
+ private long getLocalFileMtimeSeconds(String filePath)
+ throws IOException, InterruptedException {
+ Container.ExecResult result =
+ baseContainer.execInContainer("sh", "-c", "stat -c %Y '" +
filePath + "'");
+ Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+ return Long.parseLong(result.getStdout().trim());
+ }
+
private Path convertToLzoFile(File file) throws IOException {
LzopCodec lzo = new LzopCodec();
Path path = Paths.get(file.getAbsolutePath() + ".lzo");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
new file mode 100644
index 0000000000..fc5eded175
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/tmp/seatunnel/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+
+sink {
+ LocalFile {
+ path = "/tmp/seatunnel/update/dst"
+ tmp_path = "/tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
new file mode 100644
index 0000000000..f6d82e5ef2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/tmp/seatunnel/update/dst"
+ update_strategy = "strict"
+ compare_mode = "checksum"
+ }
+}
+
+sink {
+ LocalFile {
+ path = "/tmp/seatunnel/update/dst"
+ tmp_path = "/tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index d8bc2efbb9..756759e078 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startables;
import
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
@@ -61,6 +62,8 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
private static final int SFTP_BIND_PORT = 2222;
+ private static final String SFTP_CONTAINER_HOME = "/home/seatunnel";
+
private static final String USERNAME = "seatunnel";
private static final String PASSWORD = "pass";
@@ -211,6 +214,32 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(getFileListFromContainer(homePath +
sink02).size(), 1);
}
+ @TestTemplate
+ public void testSftpBinaryUpdateModeDistcp(TestContainer container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/src/test.bin", "abc");
+
+ TestHelper helper = new TestHelper(container);
+ helper.execute("/text/sftp_binary_update_distcp.conf");
+ Assertions.assertEquals(
+ "abc", readSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/dst/test.bin"));
+
+ // Make target newer with same length, distcp strategy should SKIP
overwrite.
+ putSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/dst/test.bin", "zzz");
+ helper.execute("/text/sftp_binary_update_distcp.conf");
+ Assertions.assertEquals(
+ "zzz", readSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/dst/test.bin"));
+
+ // Change source length, distcp strategy should COPY overwrite.
+ putSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/src/test.bin", "abcd");
+ helper.execute("/text/sftp_binary_update_distcp.conf");
+ Assertions.assertEquals(
+ "abcd", readSftpFile(SFTP_CONTAINER_HOME +
"/tmp/seatunnel/update/dst/test.bin"));
+
+ deleteFileFromContainer(SFTP_CONTAINER_HOME + "/tmp/seatunnel/update");
+ }
+
@TestTemplate
public void testMultipleTableAndSaveMode(TestContainer container)
throws IOException, InterruptedException {
@@ -244,6 +273,49 @@ public class SftpFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 2);
}
+ private void resetUpdateTestPath() throws IOException,
InterruptedException {
+ deleteFileFromContainer(SFTP_CONTAINER_HOME + "/tmp/seatunnel/update");
+ Container.ExecResult mkdirResult =
+ sftpContainer.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + SFTP_CONTAINER_HOME
+ + "/tmp/seatunnel/update/src "
+ + SFTP_CONTAINER_HOME
+ + "/tmp/seatunnel/update/dst "
+ + SFTP_CONTAINER_HOME
+ + "/tmp/seatunnel/update/tmp");
+ Assertions.assertEquals(0, mkdirResult.getExitCode(),
mkdirResult.getStderr());
+ sftpContainer.execInContainer(
+ "sh",
+ "-c",
+ "chmod -R 777 " + SFTP_CONTAINER_HOME + "/tmp/seatunnel/update
|| true");
+ }
+
+ private void putSftpFile(String containerPath, String content)
+ throws IOException, InterruptedException {
+ String command =
+ "mkdir -p $(dirname '"
+ + containerPath
+ + "') && printf '"
+ + content
+ + "' > '"
+ + containerPath
+ + "' && chmod 666 '"
+ + containerPath
+ + "'";
+ Container.ExecResult putResult = sftpContainer.execInContainer("sh",
"-c", command);
+ Assertions.assertEquals(0, putResult.getExitCode(),
putResult.getStderr());
+ }
+
+ private String readSftpFile(String containerPath) throws IOException,
InterruptedException {
+ Container.ExecResult catResult =
+ sftpContainer.execInContainer("sh", "-c", "cat '" +
containerPath + "'");
+ Assertions.assertEquals(0, catResult.getExitCode(),
catResult.getStderr());
+ return catResult.getStdout() == null ? "" :
catResult.getStdout().trim();
+ }
+
@SneakyThrows
private List<String> getFileListFromContainer(String path) {
String command = "ls -1 " + path;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
new file mode 100644
index 0000000000..f1dbc21721
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "tmp/seatunnel/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+ }
+}
+
+sink {
+ SftpFile {
+ host = "sftp"
+ port = 22
+ user = seatunnel
+ password = pass
+
+ path = "tmp/seatunnel/update/dst"
+ tmp_path = "tmp/seatunnel/update/tmp"
+ file_format_type = "binary"
+ }
+}
+