This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 07a2125e53 [Feature][Connector-V2] Support sync_mode=update for 
FtpFile/SftpFile/LocalFile source (binary) (#10437)
07a2125e53 is described below

commit 07a2125e53e0d7349cb25f67ef0e79db50fe6616
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Feb 4 21:24:12 2026 +0800

    [Feature][Connector-V2] Support sync_mode=update for 
FtpFile/SftpFile/LocalFile source (binary) (#10437)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connectors/source/FtpFile.md               |  86 +++++++++
 docs/en/connectors/source/LocalFile.md             |  76 ++++++++
 docs/en/connectors/source/SftpFile.md              |  87 +++++++++
 docs/zh/connectors/source/FtpFile.md               |  88 ++++++++-
 docs/zh/connectors/source/LocalFile.md             |  76 ++++++++
 docs/zh/connectors/source/SftpFile.md              |  87 +++++++++
 .../file/source/reader/AbstractReadStrategy.java   | 200 ++++++++++++++-----
 .../file/source/reader/BinaryReadStrategy.java     |  37 ++--
 .../file/reader/BinaryReadStrategyTest.java        |  35 ++++
 .../source/reader/AbstractReadStrategyTest.java    |  24 +++
 .../file/ftp/source/FtpFileSourceFactory.java      |  10 +
 .../seatunnel/file/ftp/FtpFileFactoryTest.java     |  28 ++-
 .../file/local/source/LocalFileSourceFactory.java  |  10 +
 .../seatunnel/file/local/LocalFileFactoryTest.java |  28 ++-
 .../file/sftp/source/SftpFileSourceFactory.java    |  10 +
 .../seatunnel/file/sftp/SftpFileFactoryTest.java   |  28 ++-
 .../e2e/connector/file/ftp/FtpFileIT.java          | 211 +++++++++++++++++++--
 .../resources/text/ftp_binary_update_distcp.conf   |  52 +++++
 .../e2e/connector/file/local/LocalFileIT.java      |  97 ++++++++++
 .../binary/local_file_binary_update_distcp.conf    |  42 ++++
 .../local_file_binary_update_strict_checksum.conf  |  42 ++++
 .../e2e/connector/file/fstp/SftpFileIT.java        |  72 +++++++
 .../resources/text/sftp_binary_update_distcp.conf  |  59 ++++++
 23 files changed, 1402 insertions(+), 83 deletions(-)

diff --git a/docs/en/connectors/source/FtpFile.md 
b/docs/en/connectors/source/FtpFile.md
index 6b612cfa4c..48cccd96a7 100644
--- a/docs/en/connectors/source/FtpFile.md
+++ b/docs/en/connectors/source/FtpFile.md
@@ -76,6 +76,11 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | null_format                 | string  | no       | -                         
  |
 | binary_chunk_size           | int     | no       | 1024                      
  |
 | binary_complete_file_mode   | boolean | no       | false                     
  |
+| sync_mode                   | string  | no       | full                      
  |
+| target_path                 | string  | no       | -                         
  |
+| target_hadoop_conf          | map     | no       | -                         
  |
+| update_strategy             | string  | no       | distcp                    
  |
+| compare_mode                | string  | no       | len_mtime                 
  |
 | common-options              |         | no       | -                         
  |
 | file_filter_modified_start  | string  | no       | -                         
  | 
 | file_filter_modified_end    | string  | no       | -                         
  | 
@@ -434,6 +439,46 @@ Only used when file_format_type is binary.
 
 Whether to read the complete file as a single chunk instead of splitting into 
chunks. When enabled, the entire file content will be read into memory at once. 
Default is false.
 
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads 
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each 
source file.
+- For remote file systems (FTP/SFTP), this adds per-file network overhead. It 
is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and 
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock 
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If 
checksum is unavailable, SeaTunnel falls back to content comparison (more 
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it 
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target 
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default), 
`checksum` (only valid when `update_strategy=strict`).
+
 ### file_filter_modified_start [string]
 
 File modification time filter. The connector will filter some files base on 
the last modification start time (include start time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.
@@ -570,6 +615,47 @@ sink {
 
 ```
 
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only 
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same 
filesystem and same relative paths).
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FtpFile {
+    host = "192.168.31.48"
+    port = 21
+    user = tyrantlucifer
+    password = tianchao
+
+    path = "/seatunnel/read/binary/"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/seatunnel/read/binary2/"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+sink {
+  FtpFile {
+    host = "192.168.31.48"
+    port = 21
+    user = tyrantlucifer
+    password = tianchao
+
+    path = "/seatunnel/read/binary2/"
+    tmp_path = "/seatunnel/read/binary2-tmp/"
+    file_format_type = "binary"
+  }
+}
+```
+
 ### Filter File
 
 ```hocon
diff --git a/docs/en/connectors/source/LocalFile.md 
b/docs/en/connectors/source/LocalFile.md
index 72c01544c3..3af8d35067 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -76,6 +76,11 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | null_format                | string  | no       | -                          
          |
 | binary_chunk_size          | int     | no       | 1024                       
          |
 | binary_complete_file_mode  | boolean | no       | false                      
          |
+| sync_mode                  | string  | no       | full                       
          |
+| target_path                | string  | no       | -                          
          |
+| target_hadoop_conf         | map     | no       | -                          
          |
+| update_strategy            | string  | no       | distcp                     
          |
+| compare_mode               | string  | no       | len_mtime                  
          |
 | common-options             |         | no       | -                          
          |
 | tables_configs             | list    | no       | used to define a multiple 
table task |
 | file_filter_modified_start | string  | no       | -                          
          |
@@ -410,6 +415,46 @@ Only used when file_format_type is binary.
 
 Whether to read the complete file as a single chunk instead of splitting into 
chunks. When enabled, the entire file content will be read into memory at once. 
Default is false.
 
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads 
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each 
source file.
+- It is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and 
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock 
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If 
checksum is unavailable, SeaTunnel falls back to content comparison (more 
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it 
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target 
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default), 
`checksum` (only valid when `update_strategy=strict`).
+
 ### file_filter_modified_start [string]
 
 File modification time filter. The connector will filter some files base on 
the last modification start time (include start time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.
@@ -575,6 +620,37 @@ sink {
 
 ```
 
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only 
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same 
filesystem and same relative paths).
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/binary/"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/seatunnel/read/binary2/"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+sink {
+  LocalFile {
+    path = "/seatunnel/read/binary2/"
+    tmp_path = "/seatunnel/read/binary2-tmp/"
+    file_format_type = "binary"
+  }
+}
+```
+
 ### Filter File
 
 ```hocon
diff --git a/docs/en/connectors/source/SftpFile.md 
b/docs/en/connectors/source/SftpFile.md
index cc7ae6c94e..9b8b173188 100644
--- a/docs/en/connectors/source/SftpFile.md
+++ b/docs/en/connectors/source/SftpFile.md
@@ -107,6 +107,11 @@ The File does not have a specific type list, and we can 
indicate which SeaTunnel
 | null_format                | string  | no       | -                          
   | Only used when file_format_type is text. null_format to define which 
strings can be represented as null. e.g: `\N`                                   
                                                                                
                                                                                
                                                           |
 | binary_chunk_size          | int     | no       | 1024                       
   | Only used when file_format_type is binary. The chunk size (in bytes) for 
reading binary files. Default is 1024 bytes. Larger values may improve 
performance for large files but use more memory.                                
                                                                                
                                                                |
 | binary_complete_file_mode  | boolean | no       | false                      
   | Only used when file_format_type is binary. Whether to read the complete 
file as a single chunk instead of splitting into chunks. When enabled, the 
entire file content will be read into memory at once. Default is false.         
                                                                                
                                                             |
+| sync_mode                  | string  | no       | full                       
   | File sync mode. Supported values: `full`, `update`. When `update`, the 
source compares files between source/target and only reads new/changed files 
(currently only supports `file_format_type=binary`).                            
                                                                                
                   |
+| target_path                | string  | no       | -                          
   | Only used when `sync_mode=update`. Target base path used for comparison 
(it should usually be the same as sink `path`).                                 
                                                                                
                                                                                
          |
+| target_hadoop_conf         | map     | no       | -                          
   | Only used when `sync_mode=update`. Extra Hadoop configuration for target 
filesystem. You can set `fs.defaultFS` in this map to override target 
defaultFS.                                                                      
                                                                                
                     |
+| update_strategy            | string  | no       | distcp                     
   | Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.                                                                       
                                                                                
                                                                                
        |
+| compare_mode               | string  | no       | len_mtime                  
   | Only used when `sync_mode=update`. Supported values: `len_mtime` 
(default), `checksum` (only valid when `update_strategy=strict`).               
                                                                                
                                                                                
               |
 | common-options             |         | No       | -                          
   | Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.               
                                                                                
                                                                                
                                                                               |
 | file_filter_modified_start | string  | no       | -                          
   | File modification time filter. The connector will filter some files base 
on the last modification start time (include start time). The default data 
format is `yyyy-MM-dd HH:mm:ss`.                                                
                                                                                
                                                            |
 | file_filter_modified_end   | string  | no       | -                          
   | File modification time filter. The connector will filter some files base 
on the last modification end time (not include end time). The default data 
format is `yyyy-MM-dd HH:mm:ss`.                                                
                                                                                
                                                            |
@@ -292,6 +297,46 @@ Only used when file_format_type is binary.
 
 Whether to read the complete file as a single chunk instead of splitting into 
chunks. When enabled, the entire file content will be read into memory at once. 
Default is false.
 
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+When `update`, the source compares files between source/target and only reads 
new/changed files (currently only supports `file_format_type=binary`).
+
+**Performance considerations**
+- Update mode triggers an extra `getFileStatus` call on the target for each 
source file.
+- For remote file systems (FTP/SFTP), this adds per-file network overhead. It 
is not recommended for massive small-file scenarios.
+
+**Requirements / limitations**
+- `target_path` should typically align with sink `path` (same filesystem and 
same relative path layout).
+- When `update_strategy=distcp`, correctness depends on source/target clock 
synchronization.
+- When `compare_mode=checksum`, filesystem checksum support is required. If 
checksum is unavailable, SeaTunnel falls back to content comparison (more 
expensive) and logs a warning.
+
+Example:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+Only used when `sync_mode=update`. Target base path used for comparison (it 
should usually be the same as sink `path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`. Extra Hadoop configuration for target 
filesystem. You can set `fs.defaultFS` in this map to override target defaultFS.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default), 
`checksum` (only valid when `update_strategy=strict`).
+
 ### quote_char [string]
 
 A single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.
@@ -439,6 +484,48 @@ sink {
   }
 }
 ```
+
+### Incremental Sync (sync_mode=update, binary)
+
+`sync_mode=update` compares files between source and `target_path`, then only 
reads new/changed files.
+In most cases, `target_path` should be aligned with sink `path` (same 
filesystem and same relative paths).
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "tmp/seatunnel/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/dst"
+    tmp_path = "tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+```
 ## Changelog
 
 <ChangeLog />
diff --git a/docs/zh/connectors/source/FtpFile.md 
b/docs/zh/connectors/source/FtpFile.md
index 9c10e6d730..1dd2e61ce6 100644
--- a/docs/zh/connectors/source/FtpFile.md
+++ b/docs/zh/connectors/source/FtpFile.md
@@ -72,6 +72,11 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
 | null_format                 | string  | 否    | -                   |
 | binary_chunk_size           | int     | 否    | 1024                |
 | binary_complete_file_mode   | boolean | 否    | false               |
+| sync_mode                   | string  | 否    | full                |
+| target_path                 | string  | 否    | -                   |
+| target_hadoop_conf          | map     | 否    | -                   |
+| update_strategy             | string  | 否    | distcp              |
+| compare_mode                | string  | 否    | len_mtime           |
 | common-options              |         | 否    | -                   |
 | file_filter_modified_start  | string  | 否    | -                   | 
 | file_filter_modified_end    | string  | 否    | -                   | 
@@ -404,6 +409,46 @@ SeaTunnel 将从源文件中跳过前 2 行。
 
 是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
 
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 对于远程文件系统(FTP/SFTP),会带来按文件的网络开销,不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel 
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在 
`update_strategy=strict` 时可用)。
+
 ### file_filter_modified_start
 
 按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。
@@ -540,6 +585,47 @@ sink {
 
 ```
 
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FtpFile {
+    host = "192.168.31.48"
+    port = 21
+    user = tyrantlucifer
+    password = tianchao
+
+    path = "/seatunnel/read/binary/"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/seatunnel/read/binary2/"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+sink {
+  FtpFile {
+    host = "192.168.31.48"
+    port = 21
+    user = tyrantlucifer
+    password = tianchao
+
+    path = "/seatunnel/read/binary2/"
+    tmp_path = "/seatunnel/read/binary2-tmp/"
+    file_format_type = "binary"
+  }
+}
+```
+
 ### 过滤文件
 
 ```hocon
@@ -569,4 +655,4 @@ sink {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/LocalFile.md 
b/docs/zh/connectors/source/LocalFile.md
index ef6dabed25..22dd9391a4 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -76,6 +76,11 @@ import ChangeLog from '../changelog/connector-file-local.md';
 | null_format                | string  | 否    | -                   |
 | binary_chunk_size          | int     | 否    | 1024                |
 | binary_complete_file_mode  | boolean | 否    | false               |
+| sync_mode                  | string  | 否    | full                |
+| target_path                | string  | 否    | -                   |
+| target_hadoop_conf         | map     | 否    | -                   |
+| update_strategy            | string  | 否    | distcp              |
+| compare_mode               | string  | 否    | len_mtime           |
 | common-options             |         | 否    | -                   |
 | tables_configs             | list    | 否    | 用于定义多表任务            |
 | file_filter_modified_start | string  | 否    | -                   | 
@@ -411,6 +416,46 @@ null_format 定义哪些字符串可以表示为 null。
 
 是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
 
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel 
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在 
`update_strategy=strict` 时可用)。
+
 ### file_filter_modified_start
 
 按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。
@@ -576,6 +621,37 @@ sink {
 
 ```
 
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/binary/"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/seatunnel/read/binary2/"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+sink {
+  LocalFile {
+    path = "/seatunnel/read/binary2/"
+    tmp_path = "/seatunnel/read/binary2-tmp/"
+    file_format_type = "binary"
+  }
+}
+```
+
 ### 过滤文件
 
 ```hocon
diff --git a/docs/zh/connectors/source/SftpFile.md 
b/docs/zh/connectors/source/SftpFile.md
index 42dcb346fa..7de83e3af9 100644
--- a/docs/zh/connectors/source/SftpFile.md
+++ b/docs/zh/connectors/source/SftpFile.md
@@ -107,6 +107,11 @@ import ChangeLog from 
'../changelog/connector-file-sftp.md';
 | null_format                | string  | 否    | -                   | 
仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N`                
                                                                                
                                                                                
   |
 | binary_chunk_size          | int     | 否    | 1024                | 
仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
                                                                                
                                                                               |
 | binary_complete_file_mode  | boolean | 否    | false               | 
仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
                                                                                
                                                                                
   |
+| sync_mode                  | string  | 否    | full                | 
文件同步模式,支持:`full`(默认)、`update`。当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 
`file_format_type=binary`)。                                                     
                                                                                
                     |
+| target_path                | string  | 否    | -                   | 仅在 
`sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。              
                                                                                
                                                                                
           |
+| target_hadoop_conf         | map     | 否    | -                   | 仅在 
`sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 defaultFS。  
                                                                                
                                                                                
                   |
+| update_strategy            | string  | 否    | distcp              | 仅在 
`sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。                                
                                                                                
                                                                     |
+| compare_mode               | string  | 否    | len_mtime           | 仅在 
`sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在 
`update_strategy=strict` 时可用)。                                                  
                                                                                
           |
 | common-options             |         | 否    | -                   | 
数据源插件通用参数,请参考[数据源通用选项](../common-options/source-common-options.md)了解详情。         
                                                                                
                                                                                
                  |
 | file_filter_modified_start | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                       
                                                                                
                                                                                
   |
 | file_filter_modified_end   | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                      
                                                                                
                                                                                
   |
@@ -291,6 +296,46 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
 
 是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
 
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)、`update`。
+当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 `file_format_type=binary`)。
+
+**性能注意事项**
+- Update 模式会对每个源文件额外发起一次到目标端的 `getFileStatus` 用于对比。
+- 对于远程文件系统(FTP/SFTP),会带来按文件的网络开销,不建议用于海量小文件场景。
+
+**要求 / 限制**
+- `target_path` 通常应与 sink 的 `path` 一致(同一文件系统且相对路径结构一致)。
+- 使用 `update_strategy=distcp` 时,依赖源/目标端时钟同步,否则可能误判。
+- 使用 `compare_mode=checksum` 时,需要文件系统支持 checksum;若无法获取 checksum,SeaTunnel 
会降级为内容比较(开销更大)并打印告警日志。
+
+示例:
+
+```hocon
+sync_mode = "update"
+file_format_type = "binary"
+target_path = "/path/to/your/sink/path"
+update_strategy = "distcp"
+compare_mode = "len_mtime"
+```
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 
defaultFS。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在 
`update_strategy=strict` 时可用)。
+
 ### schema [config]
 
 #### fields [Config]
@@ -430,6 +475,48 @@ sink {
   }
 }
 ```
+
+### 增量同步(sync_mode=update,仅 binary)
+
+`sync_mode=update` 会对比 source 与 `target_path`,仅读取新增/变更文件。
+多数情况下,`target_path` 需要与 sink 的 `path` 对齐(同一文件系统、相同相对路径)。
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "tmp/seatunnel/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/dst"
+    tmp_path = "tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+```
 ## 变更日志
 
 <ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 80b88cd170..79bc6badf3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -124,6 +124,11 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     protected transient boolean shareTargetFileSystemProxy;
     protected transient boolean checksumUnavailableWarned;
 
+    private static final class UpdateModeStats {
+        private long scanned;
+        private long skipped;
+    }
+
     @Override
     public void init(HadoopConf conf) {
         this.hadoopConf = conf;
@@ -147,46 +152,74 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     @Override
     public List<String> getFileNamesByPath(String path) throws IOException {
         ArrayList<String> fileNames = new ArrayList<>();
+        UpdateModeStats updateModeStats = enableUpdateSync ? new 
UpdateModeStats() : null;
+        collectFileNamesByPath(path, fileNames, updateModeStats);
+        if (updateModeStats != null) {
+            log.info(
+                    "Update sync mode statistics: scanned={}, skipped={}, 
to_sync={}",
+                    updateModeStats.scanned,
+                    updateModeStats.skipped,
+                    updateModeStats.scanned - updateModeStats.skipped);
+        }
+        return fileNames;
+    }
+
+    private void collectFileNamesByPath(
+            String path, List<String> fileNames, UpdateModeStats 
updateModeStats)
+            throws IOException {
         FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
         for (FileStatus fileStatus : stats) {
             if (fileStatus.isDirectory()) {
                 // skip hidden tmp directory, such as .hive-staging_hive
                 if (!fileStatus.getPath().getName().startsWith(".")) {
-                    
fileNames.addAll(getFileNamesByPath(fileStatus.getPath().toString()));
+                    collectFileNamesByPath(
+                            fileStatus.getPath().toString(), fileNames, 
updateModeStats);
                 }
                 continue;
             }
-            if (fileStatus.isFile() && filterFileByPattern(fileStatus) && 
fileStatus.getLen() > 0) {
-                // filter '_SUCCESS' file
-                if (!fileStatus.getPath().getName().equals("_SUCCESS")
-                        && !fileStatus.getPath().getName().startsWith(".")
-                        && filterFileByModificationDate(fileStatus)) {
-
-                    String filePath = fileStatus.getPath().toString();
-                    if (!readPartitions.isEmpty()) {
-                        for (String readPartition : readPartitions) {
-                            if (filePath.contains(readPartition)) {
-                                if (shouldSyncFileInUpdateMode(fileStatus)) {
-                                    fileNames.add(filePath);
-                                    this.fileNames.add(filePath);
-                                }
-                                break;
-                            }
-                        }
-                    } else {
-                        if (shouldSyncFileInUpdateMode(fileStatus)) {
-                            fileNames.add(filePath);
-                            this.fileNames.add(filePath);
-                        }
+            if (!fileStatus.isFile()
+                    || !filterFileByPattern(fileStatus)
+                    || fileStatus.getLen() <= 0) {
+                continue;
+            }
+
+            // filter '_SUCCESS' file and hidden files
+            String fileName = fileStatus.getPath().getName();
+            if (fileName.equals("_SUCCESS")
+                    || fileName.startsWith(".")
+                    || !filterFileByModificationDate(fileStatus)) {
+                continue;
+            }
+
+            String filePath = fileStatus.getPath().toString();
+            if (StringUtils.isNotEmpty(filenameExtension)
+                    && !filePath.endsWith(filenameExtension)) {
+                continue;
+            }
+
+            if (!readPartitions.isEmpty()) {
+                boolean partitionMatched = false;
+                for (String readPartition : readPartitions) {
+                    if (filePath.contains(readPartition)) {
+                        partitionMatched = true;
+                        break;
                     }
                 }
+                if (!partitionMatched) {
+                    continue;
+                }
+            }
+
+            if (updateModeStats != null) {
+                updateModeStats.scanned++;
+            }
+            if (shouldSyncFileInUpdateMode(fileStatus)) {
+                fileNames.add(filePath);
+                this.fileNames.add(filePath);
+            } else if (updateModeStats != null) {
+                updateModeStats.skipped++;
             }
         }
-        if (StringUtils.isNotEmpty(filenameExtension)) {
-            this.fileNames.removeIf(fileName -> 
!fileName.endsWith(filenameExtension));
-            fileNames.removeIf(fileName -> 
!fileName.endsWith(filenameExtension));
-        }
-        return fileNames;
     }
 
     private Date getFileModifiedDate(String modifiedDate) {
@@ -304,6 +337,12 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         enableUpdateSync = syncMode == FileSyncMode.UPDATE;
         if (enableUpdateSync) {
             validateUpdateSyncConfig(pluginConfig);
+            log.info(
+                    "Update sync mode enabled: source_path={}, target_path={}, 
update_strategy={}, compare_mode={}",
+                    maskUriUserInfo(sourceRootPath),
+                    maskUriUserInfo(targetPath),
+                    updateStrategy.name().toLowerCase(Locale.ROOT),
+                    compareMode.name().toLowerCase(Locale.ROOT));
         }
     }
 
@@ -711,37 +750,73 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         long targetMtime = targetFileStatus.getModificationTime();
 
         if (updateStrategy == FileUpdateStrategy.DISTCP) {
-            return sourceMtime > targetMtime;
+            if (sourceMtime > targetMtime) {
+                return true;
+            }
+            logUpdateModeSkip(sourceFilePath, targetFilePath, "distcp: target 
newer or same");
+            return false;
         }
 
         if (updateStrategy == FileUpdateStrategy.STRICT) {
             if (compareMode == FileCompareMode.LEN_MTIME) {
-                return sourceMtime != targetMtime;
+                if (sourceMtime != targetMtime) {
+                    return true;
+                }
+                logUpdateModeSkip(
+                        sourceFilePath, targetFilePath, "strict len_mtime: len 
and mtime equal");
+                return false;
             }
             if (compareMode == FileCompareMode.CHECKSUM) {
-                FileChecksum sourceChecksum = 
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
-                FileChecksum targetChecksum =
-                        
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
-                if (sourceChecksum == null || targetChecksum == null) {
+                FileChecksum sourceChecksum = null;
+                FileChecksum targetChecksum = null;
+                Exception checksumException = null;
+                try {
+                    sourceChecksum = 
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
+                    targetChecksum = 
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
+                } catch (Exception e) {
+                    checksumException = e;
+                }
+
+                if (checksumException != null || sourceChecksum == null || 
targetChecksum == null) {
                     if (!checksumUnavailableWarned) {
-                        log.warn(
-                                "File checksum is not available, fallback to 
content comparison. source={}, target={}",
-                                sourceFilePath,
-                                targetFilePath);
+                        if (checksumException == null) {
+                            log.warn(
+                                    "File checksum is not available, fallback 
to content comparison. source={}, target={}",
+                                    maskUriUserInfo(sourceFilePath),
+                                    maskUriUserInfo(targetFilePath));
+                        } else {
+                            log.warn(
+                                    "File checksum is not available, fallback 
to content comparison. source={}, target={}",
+                                    maskUriUserInfo(sourceFilePath),
+                                    maskUriUserInfo(targetFilePath),
+                                    checksumException);
+                        }
                         checksumUnavailableWarned = true;
                     }
                     try {
-                        return !fileContentEquals(sourceFilePath, 
targetFilePath);
+                        boolean sameContent = 
fileContentEquals(sourceFilePath, targetFilePath);
+                        if (sameContent) {
+                            logUpdateModeSkip(
+                                    sourceFilePath,
+                                    targetFilePath,
+                                    "strict checksum: content equal (checksum 
unavailable)");
+                        }
+                        return !sameContent;
                     } catch (Exception e) {
                         log.warn(
                                 "Fallback content comparison failed, fallback 
to COPY. source={}, target={}",
-                                sourceFilePath,
-                                targetFilePath,
+                                maskUriUserInfo(sourceFilePath),
+                                maskUriUserInfo(targetFilePath),
                                 e);
                         return true;
                     }
                 }
-                return !checksumEquals(sourceChecksum, targetChecksum);
+                if (checksumEquals(sourceChecksum, targetChecksum)) {
+                    logUpdateModeSkip(
+                            sourceFilePath, targetFilePath, "strict checksum: 
checksum equal");
+                    return false;
+                }
+                return true;
             }
         }
 
@@ -790,7 +865,17 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         return new Path(targetBasePath, cleanRelativePath).toString();
     }
 
-    private static String resolveRelativePath(String basePath, String 
fullFilePath) {
+    /**
+     * Resolve relative path from {@code basePath} to {@code fullFilePath}.
+     *
+     * <p><b>NOTE:</b> This method is intended for internal use by specific 
read strategies (for
+     * example {@link BinaryReadStrategy}) that need custom path resolution 
logic.
+     *
+     * @param basePath base directory path
+     * @param fullFilePath full file path
+     * @return relative path from base to file
+     */
+    protected static String resolveRelativePath(String basePath, String 
fullFilePath) {
         String base = normalizePathPart(basePath);
         String file = normalizePathPart(fullFilePath);
         if (StringUtils.isBlank(file)) {
@@ -824,6 +909,35 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         }
     }
 
+    private static String maskUriUserInfo(String rawPath) {
+        if (StringUtils.isBlank(rawPath)) {
+            return rawPath;
+        }
+        try {
+            java.net.URI uri = new Path(rawPath).toUri();
+            if (uri.getUserInfo() == null || uri.getAuthority() == null) {
+                return rawPath;
+            }
+            String maskedAuthority = 
uri.getAuthority().replace(uri.getUserInfo() + "@", "***@");
+            return uri.getScheme()
+                    + "://"
+                    + maskedAuthority
+                    + (uri.getPath() == null ? "" : uri.getPath());
+        } catch (Exception e) {
+            return rawPath;
+        }
+    }
+
+    private void logUpdateModeSkip(String sourceFilePath, String 
targetFilePath, String reason) {
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Update sync mode skipped file: source={}, target={}, 
reason={}",
+                    maskUriUserInfo(sourceFilePath),
+                    maskUriUserInfo(targetFilePath),
+                    reason);
+        }
+    }
+
     private static <E extends Enum<E>> E parseEnumValue(
             Class<E> enumClass, String rawValue, String optionKey) {
         if (StringUtils.isBlank(rawValue)) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
index af6431646a..a5867d562d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.MetadataUtil;
@@ -29,8 +30,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
@@ -45,7 +46,8 @@ public class BinaryReadStrategy extends AbstractReadStrategy {
                         PrimitiveByteArrayType.INSTANCE, 
BasicType.STRING_TYPE, BasicType.LONG_TYPE
                     });
 
-    private File basePath;
+    private String basePath;
+    private transient boolean basePathIsFile;
     private int binaryChunkSize = 
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
     private boolean completeFileMode =
             FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
@@ -53,7 +55,16 @@ public class BinaryReadStrategy extends AbstractReadStrategy 
{
     @Override
     public void init(HadoopConf conf) {
         super.init(conf);
-        basePath = new 
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+        basePath = 
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
+        try {
+            basePathIsFile = hadoopFileSystemProxy.isFile(basePath);
+        } catch (IOException e) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "Failed to determine whether file source path is a file or 
directory: "
+                            + basePath,
+                    e);
+        }
 
         // Load binary chunk size configuration
         if 
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
@@ -80,18 +91,7 @@ public class BinaryReadStrategy extends AbstractReadStrategy 
{
     public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws IOException, FileConnectorException {
         try (InputStream inputStream = 
hadoopFileSystemProxy.getInputStream(path)) {
-            String relativePath;
-            if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) {
-                relativePath = basePath.getName();
-            } else {
-                relativePath =
-                        path.substring(
-                                path.indexOf(basePath.getAbsolutePath())
-                                        + basePath.getAbsolutePath().length());
-                if (relativePath.startsWith(File.separator)) {
-                    relativePath = 
relativePath.substring(File.separator.length());
-                }
-            }
+            String relativePath = resolveBinaryRelativePath(path);
 
             if (completeFileMode) {
                 // Read entire file as a single chunk
@@ -109,6 +109,13 @@ public class BinaryReadStrategy extends 
AbstractReadStrategy {
         }
     }
 
+    private String resolveBinaryRelativePath(String filePath) {
+        if (basePathIsFile) {
+            return new Path(filePath).getName();
+        }
+        return resolveRelativePath(basePath, filePath);
+    }
+
     /** Read the entire file as a single chunk. */
     private void readCompleteFile(
             InputStream inputStream,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
index fd969c2095..259e997d7b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java
@@ -139,8 +139,43 @@ public class BinaryReadStrategyTest {
         Assertions.assertEquals(0L, row.getField(2));
     }
 
+    @Test
+    public void testBinaryRelativePath_WhenBaseIsFile() throws IOException {
+        File testFile = createTestFile("test_binary_base_is_file.bin", 10);
+
+        Config config = createConfig(testFile.getAbsolutePath(), null, null);
+        binaryReadStrategy.setPluginConfig(config);
+        binaryReadStrategy.init(localConf);
+
+        TestCollector collector = new TestCollector();
+        binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", 
collector);
+
+        List<SeaTunnelRow> rows = collector.getRows();
+        Assertions.assertFalse(rows.isEmpty());
+        Assertions.assertEquals("test_binary_base_is_file.bin", 
rows.get(0).getField(1));
+    }
+
+    @Test
+    public void testBinaryRelativePath_WhenBaseIsDirectoryWithSubDir() throws 
IOException {
+        File testFile = createTestFile("subdir/test_binary_in_sub.bin", 10);
+
+        Config config = createConfig(tempDir.toFile().getAbsolutePath(), null, 
null);
+        binaryReadStrategy.setPluginConfig(config);
+        binaryReadStrategy.init(localConf);
+
+        TestCollector collector = new TestCollector();
+        binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", 
collector);
+
+        List<SeaTunnelRow> rows = collector.getRows();
+        Assertions.assertFalse(rows.isEmpty());
+        Assertions.assertEquals("subdir/test_binary_in_sub.bin", 
rows.get(0).getField(1));
+    }
+
     private File createTestFile(String fileName, int sizeInBytes) throws 
IOException {
         File testFile = tempDir.resolve(fileName).toFile();
+        if (testFile.getParentFile() != null) {
+            testFile.getParentFile().mkdirs();
+        }
 
         if (sizeInBytes > 0) {
             try (FileOutputStream fos = new FileOutputStream(testFile)) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index c330d8ba36..74b4f7abe9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -381,6 +381,30 @@ public class AbstractReadStrategyTest {
         }
     }
 
+    @Test
+    void testResolveRelativePathWithSftpUri() {
+        String basePath = "sftp://server:22/path";;
+        String fullFilePath = "sftp://server:22/path/sub/file.txt";;
+        Assertions.assertEquals(
+                "sub/file.txt", 
AbstractReadStrategy.resolveRelativePath(basePath, fullFilePath));
+    }
+
+    @Test
+    void testResolveRelativePathWithFtpUri() {
+        String basePath = "ftp://server:21/tmp/seatunnel/read";;
+        String fullFilePath = "ftp://server:21/tmp/seatunnel/read/file.txt";;
+        Assertions.assertEquals(
+                "file.txt", AbstractReadStrategy.resolveRelativePath(basePath, 
fullFilePath));
+    }
+
+    @Test
+    void testResolveRelativePathWithCustomSchemeUri() {
+        String basePath = 
"default.default_sftp://sftp:22/tmp/seatunnel/update/src";;
+        String fullFilePath = 
"default.default_sftp://sftp:22/tmp/seatunnel/update/src/test.bin_0";;
+        Assertions.assertEquals(
+                "test.bin_0", 
AbstractReadStrategy.resolveRelativePath(basePath, fullFilePath));
+    }
+
     private static Map<String, Object> buildBasePluginConfigWithPartitions() {
         Map<String, Object> config = new HashMap<>();
         config.put(FileBaseSourceOptions.FILE_PATH.key(), 
"/tmp/dt=2024-01-01");
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 02df6d2989..765fb318a7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpFileSourceOptions;
 
@@ -101,6 +102,15 @@ public class FtpFileSourceFactory implements 
TableSourceFactory {
                 .optional(FtpFileSourceOptions.FTP_CONTROL_ENCODING)
                 .optional(FileBaseSourceOptions.QUOTE_CHAR)
                 .optional(FileBaseSourceOptions.ESCAPE_CHAR)
+                .optional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileBaseSourceOptions.TARGET_HADOOP_CONF,
+                        FileBaseSourceOptions.UPDATE_STRATEGY,
+                        FileBaseSourceOptions.COMPARE_MODE)
+                .conditional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileSyncMode.UPDATE,
+                        FileBaseSourceOptions.TARGET_PATH)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
index 529d1be814..dcf098e05f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/FtpFileFactoryTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.ftp;
 
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.sink.FtpFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.source.FtpFileSourceFactory;
 
@@ -27,7 +32,28 @@ class FtpFileFactoryTest {
 
     @Test
     void optionRule() {
-        Assertions.assertNotNull((new FtpFileSourceFactory()).optionRule());
+        OptionRule optionRule = (new FtpFileSourceFactory()).optionRule();
+        Assertions.assertNotNull(optionRule);
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+        Expression expectExpression =
+                Expression.of(FileBaseSourceOptions.SYNC_MODE, 
FileSyncMode.UPDATE);
+        Assertions.assertTrue(
+                optionRule.getRequiredOptions().stream()
+                        
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+                        
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+                        .filter(
+                                required ->
+                                        required.getOptions()
+                                                
.contains(FileBaseSourceOptions.TARGET_PATH))
+                        .anyMatch(required -> 
expectExpression.equals(required.getExpression())));
         Assertions.assertNotNull((new FtpFileSinkFactory()).optionRule());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 987558810e..a035b19f1a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileSourceOptions;
 
@@ -108,6 +109,15 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
                 .optional(FileBaseSourceOptions.READ_COLUMNS)
                 .optional(FileBaseSourceOptions.QUOTE_CHAR)
                 .optional(FileBaseSourceOptions.ESCAPE_CHAR)
+                .optional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileBaseSourceOptions.TARGET_HADOOP_CONF,
+                        FileBaseSourceOptions.UPDATE_STRATEGY,
+                        FileBaseSourceOptions.COMPARE_MODE)
+                .conditional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileSyncMode.UPDATE,
+                        FileBaseSourceOptions.TARGET_PATH)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
index 693f81578a..859d9d3148 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileFactoryTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local;
 
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.sink.LocalFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.LocalFileSourceFactory;
 
@@ -28,6 +33,27 @@ class LocalFileFactoryTest {
     @Test
     void optionRule() {
         Assertions.assertNotNull((new LocalFileSinkFactory()).optionRule());
-        Assertions.assertNotNull((new LocalFileSourceFactory()).optionRule());
+        OptionRule optionRule = (new LocalFileSourceFactory()).optionRule();
+        Assertions.assertNotNull(optionRule);
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+        Expression expectExpression =
+                Expression.of(FileBaseSourceOptions.SYNC_MODE, 
FileSyncMode.UPDATE);
+        Assertions.assertTrue(
+                optionRule.getRequiredOptions().stream()
+                        
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+                        
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+                        .filter(
+                                required ->
+                                        required.getOptions()
+                                                
.contains(FileBaseSourceOptions.TARGET_PATH))
+                        .anyMatch(required -> 
expectExpression.equals(required.getExpression())));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index 6ed53d3bed..eb363eb22c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpFileSourceOptions;
 
@@ -90,6 +91,15 @@ public class SftpFileSourceFactory implements 
TableSourceFactory {
                 .optional(FileBaseSourceOptions.READ_COLUMNS)
                 .optional(FileBaseSourceOptions.QUOTE_CHAR)
                 .optional(FileBaseSourceOptions.ESCAPE_CHAR)
+                .optional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileBaseSourceOptions.TARGET_HADOOP_CONF,
+                        FileBaseSourceOptions.UPDATE_STRATEGY,
+                        FileBaseSourceOptions.COMPARE_MODE)
+                .conditional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileSyncMode.UPDATE,
+                        FileBaseSourceOptions.TARGET_PATH)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
index d1964d3e98..746abaa0d6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/SftpFileFactoryTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sftp;
 
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sftp.sink.SftpFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sftp.source.SftpFileSourceFactory;
 
@@ -27,7 +32,28 @@ class SftpFileFactoryTest {
 
     @Test
     void optionRule() {
-        Assertions.assertNotNull((new SftpFileSourceFactory()).optionRule());
+        OptionRule optionRule = (new SftpFileSourceFactory()).optionRule();
+        Assertions.assertNotNull(optionRule);
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.SYNC_MODE));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.TARGET_HADOOP_CONF));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.UPDATE_STRATEGY));
+        Assertions.assertTrue(
+                
optionRule.getOptionalOptions().contains(FileBaseSourceOptions.COMPARE_MODE));
+
+        Expression expectExpression =
+                Expression.of(FileBaseSourceOptions.SYNC_MODE, 
FileSyncMode.UPDATE);
+        Assertions.assertTrue(
+                optionRule.getRequiredOptions().stream()
+                        
.filter(RequiredOption.ConditionalRequiredOptions.class::isInstance)
+                        
.map(RequiredOption.ConditionalRequiredOptions.class::cast)
+                        .filter(
+                                required ->
+                                        required.getOptions()
+                                                
.contains(FileBaseSourceOptions.TARGET_PATH))
+                        .anyMatch(required -> 
expectExpression.equals(required.getExpression())));
         Assertions.assertNotNull((new SftpFileSinkFactory()).optionRule());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 575fe7f9ad..2e84ff4bf4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -75,6 +75,8 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
 
     private GenericContainer<?> ftpContainer;
 
+    private String ftpHomeDir;
+
     private String ftpPassiveAddress;
 
     private BiFunction<Integer, Integer, Integer[]> generateExposedPorts =
@@ -134,36 +136,39 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
 
         log.info("ftp container started");
 
+        ftpHomeDir = getFtpUserHomeDir();
+
         ContainerUtil.copyFileIntoContainers(
                 "/json/e2e.json",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                ftpHomeDir + 
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
                 "/text/e2e.txt",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                ftpHomeDir + 
"/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
                 "/text/e2e-txt.zip",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/zip/txt/single/e2e-txt.zip",
+                ftpHomeDir + "/tmp/seatunnel/read/zip/txt/single/e2e-txt.zip",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
                 "/excel/e2e.xlsx",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+                ftpHomeDir + 
"/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
                 "/excel/e2e.xlsx",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
-                "/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx", 
ftpContainer);
+                "/excel/e2e.xlsx", ftpHomeDir + "/e2e.xlsx", ftpContainer);
 
-        ftpContainer.execInContainer("sh", "-c", "chmod -R 777 
/home/vsftpd/seatunnel/");
-        ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp 
/home/vsftpd/seatunnel/");
+        ftpContainer.execInContainer("sh", "-c", "chmod -R 777 " + ftpHomeDir 
+ "/");
+        ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp " + 
ftpHomeDir + "/");
     }
 
     @TestTemplate
@@ -175,7 +180,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
                 container, "/text/ftp_file_text_to_assert_for_passive.conf", 
configParams);
         assertJobExecution(container, 
"/text/fake_to_ftp_file_text_for_passive.conf", configParams);
 
-        String homePath = "/home/vsftpd/seatunnel/tmp/seatunnel/passive_text";
+        String homePath = ftpHomeDir + "/tmp/seatunnel/passive_text";
         // test write ftp text file
         Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
 
@@ -194,7 +199,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         Container.ExecResult execResult = 
container.executeJob("/text/ftp_to_ftp_for_binary.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
 
-        String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel";
+        String homePath = ftpHomeDir + "/uploads/seatunnel";
         Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
 
         // Confirm data is written correctly
@@ -206,34 +211,65 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         deleteFileFromContainer(homePath);
     }
 
+    @TestTemplate
+    public void testFtpBinaryUpdateModeDistcp(TestContainer container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putFtpFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+        Container.ExecResult firstRun = 
container.executeJob("/text/ftp_binary_update_distcp.conf");
+        Assertions.assertEquals(0, firstRun.getExitCode(), 
firstRun.getStderr());
+        Assertions.assertEquals("abc", 
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        // Make target newer with same length, distcp strategy should SKIP 
overwrite.
+        putFtpFile("/tmp/seatunnel/update/dst/test.bin", "zzz");
+        Container.ExecResult secondRun =
+                container.executeJob("/text/ftp_binary_update_distcp.conf");
+        Assertions.assertEquals(0, secondRun.getExitCode(), 
secondRun.getStderr());
+        Assertions.assertEquals("zzz", 
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        // Change source length, distcp strategy should COPY overwrite.
+        putFtpFile("/tmp/seatunnel/update/src/test.bin", "abcd");
+        Container.ExecResult thirdRun = 
container.executeJob("/text/ftp_binary_update_distcp.conf");
+        Assertions.assertEquals(0, thirdRun.getExitCode(), 
thirdRun.getStderr());
+        Assertions.assertEquals("abcd", 
readFtpFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        deleteFileFromContainer(ftpHomeDir + "/tmp/seatunnel/update");
+    }
+
     @TestTemplate
     public void testFtpToAssertForJsonFilter(TestContainer container)
             throws IOException, InterruptedException {
 
         ContainerUtil.copyFileIntoContainers(
                 "/json/e2e.json",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/filter/json/name=tyrantlucifer/hobby=coding/e2e.json",
                 ftpContainer);
         ContainerUtil.copyFileIntoContainers(
                 "/json/e2e.json",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.json",
                 ftpContainer);
         ContainerUtil.copyFileIntoContainers(
                 "/text/e2e.txt",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/filter/json2025/name=tyrantlucifer/hobby=coding/e2e_2025.txt",
                 ftpContainer);
         ContainerUtil.copyFileIntoContainers(
                 "/json/e2e.json",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/filter/json2024/name=tyrantlucifer/hobby=coding/e2e_2024.json",
                 ftpContainer);
 
         ContainerUtil.copyFileIntoContainers(
                 "/text/e2e.txt",
-                
"/home/vsftpd/seatunnel/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                ftpHomeDir
+                        + 
"/tmp/seatunnel/read/filter/text/name=tyrantlucifer/hobby=coding/e2e.txt",
                 ftpContainer);
 
-        ftpContainer.execInContainer("sh", "-c", "chmod -R 777 
/home/vsftpd/seatunnel/");
-        ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp 
/home/vsftpd/seatunnel/");
+        ftpContainer.execInContainer("sh", "-c", "chmod -R 777 " + ftpHomeDir 
+ "/");
+        ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp " + 
ftpHomeDir + "/");
 
         TestHelper helper = new TestHelper(container);
         // -----filter based on the file directory at the same time, the 
expression needs to start
@@ -244,7 +280,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         helper.execute("/json/ftp_to_access_for_json_name_filter.conf");
 
         // delete path
-        String filterPath = "/home/vsftpd/seatunnel/tmp/seatunnel/read/filter";
+        String filterPath = ftpHomeDir + "/tmp/seatunnel/read/filter";
         deleteFileFromContainer(filterPath);
     }
 
@@ -280,6 +316,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         // test write ftp json file
         helper.execute("/json/fake_to_ftp_file_json.conf");
         // test read ftp json file
+        ensureReadJsonInputFile();
         helper.execute("/json/ftp_file_json_to_assert.conf");
         // test write ftp parquet file
         helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
@@ -289,7 +326,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
         // test ftp source support multipleTable
 
-        String homePath = "/home/vsftpd/seatunnel";
+        String homePath = ftpHomeDir;
         String sink01 = "/tmp/seatunnel/json/sink/multiplesource/fake01";
         String sink02 = "/tmp/seatunnel/json/sink/multiplesource/fake02";
         deleteFileFromContainer(homePath + sink01);
@@ -308,7 +345,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         String specialPath = "/tmp/seatunnel/test spaces";
         String fileName = "file with spaces.txt";
         String fullPath = specialPath + "/" + fileName;
-        String homePath = "/home/vsftpd/seatunnel";
+        String homePath = ftpHomeDir;
         String containerPath = homePath + fullPath;
 
         try {
@@ -355,7 +392,7 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
             throws IOException, InterruptedException {
         TestHelper helper = new TestHelper(container);
         // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA
-        String homePath = "/home/vsftpd/seatunnel";
+        String homePath = ftpHomeDir;
         String path1 = "/tmp/seatunnel_mult/text/source_1";
         String path2 = "/tmp/seatunnel_mult/text/source_2";
         deleteFileFromContainer(homePath + path1);
@@ -383,6 +420,138 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 2);
     }
 
+    private void resetUpdateTestPath() throws IOException, 
InterruptedException {
+        deleteFileFromContainer(ftpHomeDir + "/tmp/seatunnel/update");
+        Container.ExecResult mkdirResult =
+                ftpContainer.execInContainer(
+                        "sh",
+                        "-c",
+                        "mkdir -p "
+                                + ftpHomeDir
+                                + "/tmp/seatunnel/update/src "
+                                + ftpHomeDir
+                                + "/tmp/seatunnel/update/dst "
+                                + ftpHomeDir
+                                + "/tmp/seatunnel/update/tmp");
+        Assertions.assertEquals(0, mkdirResult.getExitCode(), 
mkdirResult.getStderr());
+        ftpContainer.execInContainer(
+                "sh", "-c", "chmod -R 777 " + ftpHomeDir + 
"/tmp/seatunnel/update || true");
+        ftpContainer.execInContainer(
+                "sh", "-c", "chown -R ftp:ftp " + ftpHomeDir + 
"/tmp/seatunnel/update || true");
+    }
+
+    private void putFtpFile(String ftpPath, String content)
+            throws IOException, InterruptedException {
+        String containerPath = ftpHomeDir + ftpPath;
+        String command =
+                "mkdir -p $(dirname '"
+                        + containerPath
+                        + "') && printf '"
+                        + content
+                        + "' > '"
+                        + containerPath
+                        + "' && chmod 666 '"
+                        + containerPath
+                        + "'";
+        Container.ExecResult putResult = ftpContainer.execInContainer("sh", 
"-c", command);
+        Assertions.assertEquals(0, putResult.getExitCode(), 
putResult.getStderr());
+    }
+
+    private String readFtpFile(String ftpPath) throws IOException, 
InterruptedException {
+        String containerPath = ftpHomeDir + ftpPath;
+        Container.ExecResult catResult =
+                ftpContainer.execInContainer("sh", "-c", "cat '" + 
containerPath + "'");
+        Assertions.assertEquals(0, catResult.getExitCode(), 
catResult.getStderr());
+        return catResult.getStdout() == null ? "" : 
catResult.getStdout().trim();
+    }
+
+    private String getFtpUserHomeDir() throws IOException, 
InterruptedException {
+        // Prefer vsftpd local_root as the real filesystem root used by FTP 
paths in test configs.
+        // In some images, FTP users are created as virtual users and may not 
exist in /etc/passwd.
+        try {
+            Container.ExecResult confResult =
+                    ftpContainer.execInContainer("sh", "-c", "cat 
/etc/vsftpd/vsftpd.conf");
+            if (confResult.getExitCode() == 0 && 
StringUtils.isNotBlank(confResult.getStdout())) {
+                Properties properties = new Properties();
+                properties.load(new StringReader(confResult.getStdout()));
+                String localRoot = properties.getProperty("local_root");
+                if (StringUtils.isNotBlank(localRoot)) {
+                    String resolved =
+                            localRoot
+                                    .trim()
+                                    .replace("${FTP_USER}", USERNAME)
+                                    .replace("$FTP_USER", USERNAME)
+                                    .replace("${USER}", USERNAME)
+                                    .replace("$USER", USERNAME);
+                    if (StringUtils.isNotBlank(resolved)) {
+                        return resolved;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to resolve ftp local_root from vsftpd.conf, 
fallback to default.", e);
+        }
+
+        // Fallback: resolve from /etc/passwd if user exists
+        Container.ExecResult homeResult =
+                ftpContainer.execInContainer(
+                        "sh",
+                        "-c",
+                        "awk -F: '$1==\""
+                                + USERNAME
+                                + "\"{print $6}' /etc/passwd 2>/dev/null || 
true");
+        if (homeResult.getExitCode() == 0) {
+            String homeDir = homeResult.getStdout() == null ? "" : 
homeResult.getStdout().trim();
+            if (StringUtils.isNotBlank(homeDir)) {
+                return homeDir;
+            }
+        }
+
+        // Last resort: use default directory used by fauria/vsftpd.
+        String defaultRoot = "/home/vsftpd";
+        if (containerDirExists(defaultRoot)) {
+            log.warn(
+                    "Cannot resolve ftp home directory for user: {}, fallback 
to {}",
+                    USERNAME,
+                    defaultRoot);
+            return defaultRoot;
+        }
+        String defaultUserRoot = defaultRoot + "/" + USERNAME;
+        log.warn(
+                "Cannot resolve ftp home directory for user: {}, fallback to 
{}",
+                USERNAME,
+                defaultUserRoot);
+        return defaultUserRoot;
+    }
+
+    private boolean containerDirExists(String path) throws IOException, 
InterruptedException {
+        Container.ExecResult result =
+                ftpContainer.execInContainer(
+                        "sh", "-c", "test -d '" + path + "' && echo true || 
echo false");
+        return result.getExitCode() == 0
+                && StringUtils.equalsIgnoreCase(
+                        (result.getStdout() == null ? "" : 
result.getStdout().trim()), "true");
+    }
+
+    private void ensureReadJsonInputFile() throws IOException, 
InterruptedException {
+        Container.ExecResult mkdirResult =
+                ftpContainer.execInContainer(
+                        "sh",
+                        "-c",
+                        "mkdir -p "
+                                + ftpHomeDir
+                                + 
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding");
+        Assertions.assertEquals(0, mkdirResult.getExitCode(), 
mkdirResult.getStderr());
+        ContainerUtil.copyFileIntoContainers(
+                "/json/e2e.json",
+                ftpHomeDir + 
"/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                ftpContainer);
+        Container.ExecResult chmodResult =
+                ftpContainer.execInContainer(
+                        "sh", "-c", "chmod -R 777 " + ftpHomeDir + 
"/tmp/seatunnel/read");
+        Assertions.assertEquals(0, chmodResult.getExitCode(), 
chmodResult.getStderr());
+    }
+
     @SneakyThrows
     private List<String> getFileListFromContainer(String path) {
         String command = "ls -1 " + path;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
new file mode 100644
index 0000000000..7806f6a311
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_binary_update_distcp.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+
+    path = "/tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/tmp/seatunnel/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+
+    path = "/tmp/seatunnel/update/dst"
+    tmp_path = "/tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index bc1fb26d93..c6a4c39ded 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -40,6 +40,7 @@ import 
org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import 
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
 
@@ -422,6 +423,62 @@ public class LocalFileIT extends TestSuiteBase {
         helper.execute("/excel/local_excel_xlsx_gz_to_assert.conf");
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK, EngineType.SPARK},
+            disabledReason =
+                    "sync_mode=update needs to compare source/target on the 
same filesystem. Local filesystem is not shared between engine master/workers 
in Flink/Spark E2E.")
+    public void testLocalFileBinaryUpdateModeDistcp(TestContainer container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+        TestHelper helper = new TestHelper(container);
+        helper.execute("/binary/local_file_binary_update_distcp.conf");
+        Assertions.assertEquals("abc", 
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        // Make target newer with same length, distcp strategy should SKIP 
overwrite.
+        putLocalFile("/tmp/seatunnel/update/dst/test.bin", "zzz");
+        helper.execute("/binary/local_file_binary_update_distcp.conf");
+        Assertions.assertEquals("zzz", 
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        // Change source length, distcp strategy should COPY overwrite.
+        putLocalFile("/tmp/seatunnel/update/src/test.bin", "abcd");
+        helper.execute("/binary/local_file_binary_update_distcp.conf");
+        Assertions.assertEquals("abcd", 
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        baseContainer.execInContainer("sh", "-c", "rm -rf 
/tmp/seatunnel/update");
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK, EngineType.SPARK},
+            disabledReason =
+                    "sync_mode=update needs to compare source/target on the 
same filesystem. Local filesystem is not shared between engine master/workers 
in Flink/Spark E2E.")
+    public void testLocalFileBinaryUpdateModeStrictChecksum(TestContainer 
container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc");
+
+        TestHelper helper = new TestHelper(container);
+        
helper.execute("/binary/local_file_binary_update_strict_checksum.conf");
+        Assertions.assertEquals("abc", 
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
+
+        long firstMtimeSeconds = 
getLocalFileMtimeSeconds("/tmp/seatunnel/update/dst/test.bin");
+        Thread.sleep(1100);
+
+        
helper.execute("/binary/local_file_binary_update_strict_checksum.conf");
+        long secondMtimeSeconds = 
getLocalFileMtimeSeconds("/tmp/seatunnel/update/dst/test.bin");
+        Assertions.assertEquals(
+                firstMtimeSeconds,
+                secondMtimeSeconds,
+                "Strict checksum should skip unchanged files and keep target 
mtime");
+
+        baseContainer.execInContainer("sh", "-c", "rm -rf 
/tmp/seatunnel/update");
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {TestContainerId.SPARK_2_4},
@@ -487,6 +544,46 @@ public class LocalFileIT extends TestSuiteBase {
         Assertions.assertFalse(localFileCatalog.tableExists(tablePath));
     }
 
+    private void resetUpdateTestPath() throws IOException, 
InterruptedException {
+        Container.ExecResult result =
+                baseContainer.execInContainer(
+                        "sh",
+                        "-c",
+                        "rm -rf /tmp/seatunnel/update && mkdir -p 
/tmp/seatunnel/update/src /tmp/seatunnel/update/dst /tmp/seatunnel/update/tmp");
+        Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+    }
+
+    private void putLocalFile(String filePath, String content)
+            throws IOException, InterruptedException {
+        String command =
+                "mkdir -p $(dirname '"
+                        + filePath
+                        + "') && printf '"
+                        + content
+                        + "' > '"
+                        + filePath
+                        + "' && chmod 666 '"
+                        + filePath
+                        + "'";
+        Container.ExecResult result = baseContainer.execInContainer("sh", 
"-c", command);
+        Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+    }
+
+    private String readLocalFile(String filePath) throws IOException, 
InterruptedException {
+        Container.ExecResult result =
+                baseContainer.execInContainer("sh", "-c", "cat '" + filePath + 
"'");
+        Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+        return result.getStdout() == null ? "" : result.getStdout().trim();
+    }
+
+    private long getLocalFileMtimeSeconds(String filePath)
+            throws IOException, InterruptedException {
+        Container.ExecResult result =
+                baseContainer.execInContainer("sh", "-c", "stat -c %Y '" + 
filePath + "'");
+        Assertions.assertEquals(0, result.getExitCode(), result.getStderr());
+        return Long.parseLong(result.getStdout().trim());
+    }
+
     private Path convertToLzoFile(File file) throws IOException {
         LzopCodec lzo = new LzopCodec();
         Path path = Paths.get(file.getAbsolutePath() + ".lzo");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
new file mode 100644
index 0000000000..fc5eded175
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_distcp.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/tmp/seatunnel/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/update/dst"
+    tmp_path = "/tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
new file mode 100644
index 0000000000..f6d82e5ef2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/binary/local_file_binary_update_strict_checksum.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/tmp/seatunnel/update/dst"
+    update_strategy = "strict"
+    compare_mode = "checksum"
+  }
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/update/dst"
+    tmp_path = "/tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
index d8bc2efbb9..756759e078 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.lifecycle.Startables;
 import 
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
@@ -61,6 +62,8 @@ public class SftpFileIT extends TestSuiteBase implements 
TestResource {
 
     private static final int SFTP_BIND_PORT = 2222;
 
+    private static final String SFTP_CONTAINER_HOME = "/home/seatunnel";
+
     private static final String USERNAME = "seatunnel";
 
     private static final String PASSWORD = "pass";
@@ -211,6 +214,32 @@ public class SftpFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(getFileListFromContainer(homePath + 
sink02).size(), 1);
     }
 
+    @TestTemplate
+    public void testSftpBinaryUpdateModeDistcp(TestContainer container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/src/test.bin", "abc");
+
+        TestHelper helper = new TestHelper(container);
+        helper.execute("/text/sftp_binary_update_distcp.conf");
+        Assertions.assertEquals(
+                "abc", readSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/dst/test.bin"));
+
+        // Make target newer with same length, distcp strategy should SKIP 
overwrite.
+        putSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/dst/test.bin", "zzz");
+        helper.execute("/text/sftp_binary_update_distcp.conf");
+        Assertions.assertEquals(
+                "zzz", readSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/dst/test.bin"));
+
+        // Change source length, distcp strategy should COPY overwrite.
+        putSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/src/test.bin", "abcd");
+        helper.execute("/text/sftp_binary_update_distcp.conf");
+        Assertions.assertEquals(
+                "abcd", readSftpFile(SFTP_CONTAINER_HOME + 
"/tmp/seatunnel/update/dst/test.bin"));
+
+        deleteFileFromContainer(SFTP_CONTAINER_HOME + "/tmp/seatunnel/update");
+    }
+
     @TestTemplate
     public void testMultipleTableAndSaveMode(TestContainer container)
             throws IOException, InterruptedException {
@@ -244,6 +273,49 @@ public class SftpFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 2);
     }
 
+    private void resetUpdateTestPath() throws IOException, 
InterruptedException {
+        deleteFileFromContainer(SFTP_CONTAINER_HOME + "/tmp/seatunnel/update");
+        Container.ExecResult mkdirResult =
+                sftpContainer.execInContainer(
+                        "sh",
+                        "-c",
+                        "mkdir -p "
+                                + SFTP_CONTAINER_HOME
+                                + "/tmp/seatunnel/update/src "
+                                + SFTP_CONTAINER_HOME
+                                + "/tmp/seatunnel/update/dst "
+                                + SFTP_CONTAINER_HOME
+                                + "/tmp/seatunnel/update/tmp");
+        Assertions.assertEquals(0, mkdirResult.getExitCode(), 
mkdirResult.getStderr());
+        sftpContainer.execInContainer(
+                "sh",
+                "-c",
+                "chmod -R 777 " + SFTP_CONTAINER_HOME + "/tmp/seatunnel/update 
|| true");
+    }
+
+    private void putSftpFile(String containerPath, String content)
+            throws IOException, InterruptedException {
+        String command =
+                "mkdir -p $(dirname '"
+                        + containerPath
+                        + "') && printf '"
+                        + content
+                        + "' > '"
+                        + containerPath
+                        + "' && chmod 666 '"
+                        + containerPath
+                        + "'";
+        Container.ExecResult putResult = sftpContainer.execInContainer("sh", 
"-c", command);
+        Assertions.assertEquals(0, putResult.getExitCode(), 
putResult.getStderr());
+    }
+
+    private String readSftpFile(String containerPath) throws IOException, 
InterruptedException {
+        Container.ExecResult catResult =
+                sftpContainer.execInContainer("sh", "-c", "cat '" + 
containerPath + "'");
+        Assertions.assertEquals(0, catResult.getExitCode(), 
catResult.getStderr());
+        return catResult.getStdout() == null ? "" : 
catResult.getStdout().trim();
+    }
+
     @SneakyThrows
     private List<String> getFileListFromContainer(String path) {
         String command = "ls -1 " + path;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
new file mode 100644
index 0000000000..f1dbc21721
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_binary_update_distcp.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "tmp/seatunnel/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+
+    path = "tmp/seatunnel/update/dst"
+    tmp_path = "tmp/seatunnel/update/tmp"
+    file_format_type = "binary"
+  }
+}
+

Reply via email to