This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c134273bcf [Feature][Connector-File-Hadoop] Support sync_mode=update 
for HdfsFile source (binary) (#10268)
c134273bcf is described below

commit c134273bcfba2ef6fe138abf79d92268e6acc81c
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Jan 11 20:52:33 2026 +0800

    [Feature][Connector-File-Hadoop] Support sync_mode=update for HdfsFile 
source (binary) (#10268)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connector-v2/source/HdfsFile.md            |  44 ++-
 docs/zh/connector-v2/source/HdfsFile.md            |  44 ++-
 .../file/hdfs/source/BaseHdfsFileSource.java       |  10 +-
 .../file/config/BaseFileSourceConfig.java          |   7 +
 .../file/config/FileBaseSourceOptions.java         |  47 +++
 .../seatunnel/file/config/FileCompareMode.java     |  25 ++
 .../seatunnel/file/config/FileSyncMode.java        |  25 ++
 .../seatunnel/file/config/FileUpdateStrategy.java  |  25 ++
 .../file/hadoop/HadoopFileSystemProxy.java         |   5 +
 .../file/source/reader/AbstractReadStrategy.java   | 354 ++++++++++++++++++++-
 .../file/source/reader/UpdateSyncModeTest.java     | 245 ++++++++++++++
 .../file/hdfs/source/HdfsFileSourceFactory.java    |  10 +
 .../file/hdfs/HdfsFileSourceConfigTest.java        |  42 +++
 .../e2e/connector/file/hdfs/HdfsFileIT.java        |  68 ++++
 .../test/resources/hdfs_binary_update_distcp.conf  |  53 +++
 .../hdfs_binary_update_strict_checksum.conf        |  53 +++
 16 files changed, 1048 insertions(+), 9 deletions(-)

diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index 2ebb6de0d4..dc9827004c 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -80,6 +80,11 @@ Read data from hdfs file system.
 | null_format                | string  | no       | -                          
 | Only used when file_format_type is text. null_format to define which strings 
can be represented as null. e.g: `\N`                                           
                                                                                
                                                                                
                 |
 | binary_chunk_size          | int     | no       | 1024                       
 | Only used when file_format_type is binary. The chunk size (in bytes) for 
reading binary files. Default is 1024 bytes. Larger values may improve 
performance for large files but use more memory.                                
                                                                                
                              |
 | binary_complete_file_mode  | boolean | no       | false                      
 | Only used when file_format_type is binary. Whether to read the complete file 
as a single chunk instead of splitting into chunks. When enabled, the entire 
file content will be read into memory at once. Default is false.                
                                                                                
                    |
+| sync_mode                  | string  | no       | full                       
 | File sync mode. Supported values: `full`, `update`. When `update`, the 
source compares files between source/target and only reads new/changed files 
(currently only supports `file_format_type=binary`).                            
                                                                                
                         |
+| target_path                | string  | no       | -                          
 | Only used when `sync_mode=update`. Target base path used for comparison (it 
should usually be the same as sink `path`).                                     
                                                                                
                                                                                
                  |
+| target_hadoop_conf         | map     | no       | -                          
 | Only used when `sync_mode=update`. Extra Hadoop configuration for target 
filesystem. You can set `fs.defaultFS` in this map to override target 
defaultFS.                                                                      
                                                                                
                             |
+| update_strategy            | string  | no       | distcp                     
 | Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.                                                                       
                                                                                
                                                                                
                    |
+| compare_mode               | string  | no       | len_mtime                  
 | Only used when `sync_mode=update`. Supported values: `len_mtime` (default), 
`checksum` (only valid when `update_strategy=strict`).                          
                                                                                
                                                                                
                |
 | common-options             |         | no       | -                          
 | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                              |
 | file_filter_modified_start | string  | no       | -                          
 | File modification time filter. The connector will filter some files base on 
the last modification start time (include start time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.                                                       
                                                                                
                   |
 | file_filter_modified_end   | string  | no       | -                          
 | File modification time filter. The connector will filter some files base on 
the last modification end time (not include end time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.                                                       
                                                                                
                   |
@@ -212,6 +217,43 @@ Only used when file_format_type is binary.
 
 Whether to read the complete file as a single chunk instead of splitting into 
chunks. When enabled, the entire file content will be read into memory at once. 
Default is false.
 
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+
+When `sync_mode=update`, the source will compare files between source/target 
and only read new/changed files (currently only supports 
`file_format_type=binary`).
+
+### target_path [string]
+
+Only used when `sync_mode=update`.
+
+Target base path used for comparison (it should usually be the same as sink 
`path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`.
+
+Extra Hadoop configuration for target filesystem (optional). If not set, it 
reuses the source filesystem configuration.
+
+You can set `fs.defaultFS` in this map to override target defaultFS, e.g. 
`"fs.defaultFS" = "hdfs://nn2:9000"`.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default), 
`strict`.
+
+- `distcp`: similar to `distcp -update`:
+  - target file not exists → COPY
+  - length differs → COPY
+  - `mtime(source) > mtime(target)` → COPY
+  - else → SKIP
+- `strict`: strict consistency, decided by `compare_mode`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default), 
`checksum`.
+
+- `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
+- `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is 
equal, otherwise COPY (only valid when `update_strategy=strict`).
 ### quote_char [string]
 
 A single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.
@@ -335,4 +377,4 @@ sink {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connector-v2/source/HdfsFile.md 
b/docs/zh/connector-v2/source/HdfsFile.md
index 05b49ec67f..2fba533ec8 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -80,6 +80,11 @@ import ChangeLog from 
'../changelog/connector-file-hadoop.md';
 | null_format                | string  | 否    | -                   | 仅在 
file_format_type 为 text 时使用。null_format 定义哪些字符串可以表示为 null。例如:`\N`               
                                                                                
              |
 | binary_chunk_size          | int     | 否    | 1024                | 仅在 
file_format_type 为 binary 时使用。读取二进制文件的块大小(以字节为单位)。默认为 1024 
字节。较大的值可能会提高大文件的性能,但会使用更多内存。                                                    
                                   |
 | binary_complete_file_mode  | boolean | 否    | false               | 仅在 
file_format_type 为 binary 时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 
false。                                                                          
                  |
+| sync_mode                  | string  | 否    | full                | 
文件同步模式,支持:`full`(默认)、`update`。当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持 
`file_format_type=binary`)。                                                     
                                                     |
+| target_path                | string  | 否    | -                   | 仅在 
`sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。              
                                                                                
                       |
+| target_hadoop_conf         | map     | 否    | -                   | 仅在 
`sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 defaultFS。  
                                                                                
                               |
+| update_strategy            | string  | 否    | distcp              | 仅在 
`sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。                                
                                                                                
                                 |
+| compare_mode               | string  | 否    | len_mtime           | 仅在 
`sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在 
`update_strategy=strict` 时可用)。                                                  
                                                           |
 | common-options             |         | 否    | -                   | 
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。                      
                                                                                
                 |
 | file_filter_modified_start | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                       
                                                                                
                 |
 | file_filter_modified_end   | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                      
                                                                                
                 |
@@ -213,6 +218,43 @@ abc.*
 
 是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
 
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)`update`。
+
+当 `sync_mode=update` 时,会在读取端对源/目标进行对比,只读取新增/变更文件(目前仅支持 
`file_format_type=binary`)。
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。
+
+目标端基础路径(通常应与 sink 的 `path` 保持一致),用于对比同相对路径的目标文件是否存在/是否需要更新。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。
+
+用于访问目标文件系统的 Hadoop 配置(可选)。当不配置时默认复用 source 端的文件系统配置。
+
+可在该 map 中指定 `fs.defaultFS` 来覆盖目标端 defaultFS,例如:`"fs.defaultFS" = 
"hdfs://nn2:9000"`。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)`strict`。
+
+- `distcp`:更接近 `distcp -update` 的语义:
+  - 目标文件不存在 → COPY
+  - 长度不同 → COPY
+  - `mtime(source) > mtime(target)` → COPY
+  - 否则 → SKIP
+- `strict`:严格一致性,配合 `compare_mode` 判断是否 SKIP。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)`checksum`。
+
+- `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
+- `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在 
`update_strategy=strict` 时生效)。
 ### quote_char [string]
 
 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
@@ -336,4 +378,4 @@ sink {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 7e93e39cec..1a473b2bf3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -136,8 +136,14 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
             }
         } else {
             if (filePaths.isEmpty()) {
-                // When the directory is empty, distribute default behavior 
schema
-                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                // When there are no files (including sync_mode=update 
filtered all files), choose a
+                // compatible schema so that downstream can initialize 
correctly.
+                if (fileFormat == FileFormat.BINARY) {
+                    rowType = readStrategy.getSeaTunnelRowTypeInfo(path);
+                } else {
+                    // fallback schema when schema cannot be inferred from 
files
+                    rowType = CatalogTableUtil.buildSimpleTextSchema();
+                }
                 return;
             }
             try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index 56f8b234cb..b1bb7b0fe2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -88,6 +88,13 @@ public abstract class BaseFileSourceConfig implements 
Serializable {
             catalogTable = CatalogTableUtil.buildSimpleTextTable();
         }
         if (CollectionUtils.isEmpty(filePaths)) {
+            // When there are no files (including sync_mode=update filtered 
all files), choose a
+            // compatible schema so that downstream can initialize correctly.
+            if (fileFormat == FileFormat.BINARY) {
+                String rootPath = 
readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
+                return newCatalogTable(
+                        catalogTable, 
readStrategy.getSeaTunnelRowTypeInfo(rootPath));
+            }
             return catalogTable;
         }
         switch (fileFormat) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
index a103220c99..67adcc56c6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 public class FileBaseSourceOptions extends FileBaseOptions {
     public static final String DEFAULT_ROW_DELIMITER = "\n";
@@ -128,6 +130,51 @@ public class FileBaseSourceOptions extends FileBaseOptions 
{
                             "Whether to read the complete file as a single 
chunk instead of splitting into chunks. "
                                     + "When enabled, the entire file content 
will be read into memory at once.Only valid when file_format_type is binary.");
 
+    public static final Option<FileSyncMode> SYNC_MODE =
+            Options.key("sync_mode")
+                    .singleChoice(
+                            FileSyncMode.class,
+                            Arrays.asList(FileSyncMode.FULL, 
FileSyncMode.UPDATE))
+                    .defaultValue(FileSyncMode.FULL)
+                    .withDescription(
+                            "File sync mode. Supported values: full, update. "
+                                    + "When set to update, the source will 
compare with target and only read new/changed files. "
+                                    + "Currently, update mode only supports 
file_format_type=binary.");
+
+    public static final Option<String> TARGET_PATH =
+            Options.key("target_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Target base path for sync_mode=update 
comparison.");
+
+    public static final Option<Map<String, String>> TARGET_HADOOP_CONF =
+            Options.key("target_hadoop_conf")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Extra Hadoop configuration for target filesystem 
in sync_mode=update. "
+                                    + "Use key 'fs.defaultFS' to override 
target defaultFS if needed.");
+
+    public static final Option<FileUpdateStrategy> UPDATE_STRATEGY =
+            Options.key("update_strategy")
+                    .singleChoice(
+                            FileUpdateStrategy.class,
+                            Arrays.asList(FileUpdateStrategy.DISTCP, 
FileUpdateStrategy.STRICT))
+                    .defaultValue(FileUpdateStrategy.DISTCP)
+                    .withDescription(
+                            "Update strategy when sync_mode=update. Supported 
values: distcp, strict. "
+                                    + "distcp behaves like 'distcp -update' 
(len+mtime, and does not require equal mtime). "
+                                    + "strict requires exact consistency 
depending on compare_mode.");
+
+    public static final Option<FileCompareMode> COMPARE_MODE =
+            Options.key("compare_mode")
+                    .singleChoice(
+                            FileCompareMode.class,
+                            Arrays.asList(FileCompareMode.LEN_MTIME, 
FileCompareMode.CHECKSUM))
+                    .defaultValue(FileCompareMode.LEN_MTIME)
+                    .withDescription(
+                            "Compare mode when sync_mode=update. Supported 
values: len_mtime, checksum. "
+                                    + "checksum uses Hadoop 
FileSystem#getFileChecksum, only valid when update_strategy=strict.");
     public static final Option<String> QUOTE_CHAR =
             Options.key("quote_char")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
new file mode 100644
index 0000000000..0d398866a0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileCompareMode implements Serializable {
+    LEN_MTIME,
+    CHECKSUM
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
new file mode 100644
index 0000000000..a4941101bf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileSyncMode implements Serializable {
+    FULL,
+    UPDATE
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
new file mode 100644
index 0000000000..1acf1d3634
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileUpdateStrategy implements Serializable {
+    DISTCP,
+    STRICT
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index 0c62d26bd4..cdee0214ec 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -188,6 +189,10 @@ public class HadoopFileSystemProxy implements 
Serializable, Closeable {
         return execute(() -> getFileSystem().getFileStatus(new 
Path(filePath)));
     }
 
+    public FileChecksum getFileChecksum(String filePath) throws IOException {
+        return execute(() -> getFileSystem().getFileChecksum(new 
Path(filePath)));
+    }
+
     public FSDataOutputStream getOutputStream(String filePath) throws 
IOException {
         return execute(() -> getFileSystem().create(new Path(filePath), true));
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index c0e8d2a90f..e55edc13f2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -18,8 +18,11 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
 import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -30,8 +33,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileCompareMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileUpdateStrategy;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 
@@ -40,12 +47,15 @@ import 
org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipParameters;
 import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
@@ -58,6 +68,7 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.regex.Pattern;
@@ -101,10 +112,24 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
 
     protected boolean enableSplitFile;
 
+    protected String sourceRootPath;
+    protected boolean enableUpdateSync;
+    protected String targetPath;
+    protected FileUpdateStrategy updateStrategy =
+            FileBaseSourceOptions.UPDATE_STRATEGY.defaultValue();
+    protected FileCompareMode compareMode = 
FileBaseSourceOptions.COMPARE_MODE.defaultValue();
+    protected Map<String, String> targetHadoopConf;
+    protected transient HadoopFileSystemProxy targetHadoopFileSystemProxy;
+    protected transient boolean shareTargetFileSystemProxy;
+    protected transient boolean checksumUnavailableWarned;
+
     @Override
     public void init(HadoopConf conf) {
         this.hadoopConf = conf;
         this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
+        if (enableUpdateSync) {
+            initTargetHadoopFileSystemProxy();
+        }
     }
 
     @Override
@@ -140,14 +165,18 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
                     if (!readPartitions.isEmpty()) {
                         for (String readPartition : readPartitions) {
                             if (filePath.contains(readPartition)) {
-                                fileNames.add(filePath);
-                                this.fileNames.add(filePath);
+                                if (shouldSyncFileInUpdateMode(fileStatus)) {
+                                    fileNames.add(filePath);
+                                    this.fileNames.add(filePath);
+                                }
                                 break;
                             }
                         }
                     } else {
-                        fileNames.add(filePath);
-                        this.fileNames.add(filePath);
+                        if (shouldSyncFileInUpdateMode(fileStatus)) {
+                            fileNames.add(filePath);
+                            this.fileNames.add(filePath);
+                        }
                     }
                 }
             }
@@ -234,7 +263,9 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
             this.pattern = Pattern.compile(filterPattern);
             // because 'ConfigFactory.systemProperties()' has a 'path' 
parameter, it is necessary to
             // obtain 'path' under the premise of 'FILE_FILTER_PATTERN'
-            if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())) {
+            if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
+                    && 
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
+                            == ConfigValueType.STRING) {
                 fileBasePath = 
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
             }
         }
@@ -254,6 +285,25 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
             enableSplitFile =
                     
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
         }
+
+        if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
+                && 
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
+                        == ConfigValueType.STRING) {
+            sourceRootPath = 
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
+        }
+
+        FileSyncMode syncMode = FileBaseSourceOptions.SYNC_MODE.defaultValue();
+        if (pluginConfig.hasPath(FileBaseSourceOptions.SYNC_MODE.key())) {
+            syncMode =
+                    parseEnumValue(
+                            FileSyncMode.class,
+                            
pluginConfig.getString(FileBaseSourceOptions.SYNC_MODE.key()),
+                            FileBaseSourceOptions.SYNC_MODE.key());
+        }
+        enableUpdateSync = syncMode == FileSyncMode.UPDATE;
+        if (enableUpdateSync) {
+            validateUpdateSyncConfig(pluginConfig);
+        }
     }
 
     @Override
@@ -473,10 +523,304 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     @Override
     public void close() throws IOException {
         try {
+            if (targetHadoopFileSystemProxy != null && 
!shareTargetFileSystemProxy) {
+                targetHadoopFileSystemProxy.close();
+            }
             if (hadoopFileSystemProxy != null) {
                 hadoopFileSystemProxy.close();
             }
         } catch (Exception ignore) {
         }
     }
+
+    private void validateUpdateSyncConfig(Config pluginConfig) {
+        if 
(!pluginConfig.hasPath(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "When sync_mode=update, file_format_type must be set.");
+        }
+        FileFormat fileFormat =
+                FileFormat.valueOf(
+                        pluginConfig
+                                
.getString(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())
+                                .toUpperCase());
+        if (fileFormat != FileFormat.BINARY) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "sync_mode=update currently only supports 
file_format_type=binary.");
+        }
+
+        if (!pluginConfig.hasPath(FileBaseSourceOptions.TARGET_PATH.key())
+                || StringUtils.isBlank(
+                        
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()))) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "When sync_mode=update, target_path must be set.");
+        }
+        targetPath = 
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()).trim();
+
+        updateStrategy = FileBaseSourceOptions.UPDATE_STRATEGY.defaultValue();
+        if (pluginConfig.hasPath(FileBaseSourceOptions.UPDATE_STRATEGY.key())) 
{
+            updateStrategy =
+                    parseEnumValue(
+                            FileUpdateStrategy.class,
+                            
pluginConfig.getString(FileBaseSourceOptions.UPDATE_STRATEGY.key()),
+                            FileBaseSourceOptions.UPDATE_STRATEGY.key());
+        }
+
+        compareMode = FileBaseSourceOptions.COMPARE_MODE.defaultValue();
+        if (pluginConfig.hasPath(FileBaseSourceOptions.COMPARE_MODE.key())) {
+            compareMode =
+                    parseEnumValue(
+                            FileCompareMode.class,
+                            
pluginConfig.getString(FileBaseSourceOptions.COMPARE_MODE.key()),
+                            FileBaseSourceOptions.COMPARE_MODE.key());
+        }
+        if (updateStrategy == FileUpdateStrategy.DISTCP
+                && compareMode != FileCompareMode.LEN_MTIME) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "compare_mode="
+                            + compareMode.name().toLowerCase(Locale.ROOT)
+                            + " is not supported when 
update_strategy=distcp.");
+        }
+
+        if 
(pluginConfig.hasPath(FileBaseSourceOptions.TARGET_HADOOP_CONF.key())) {
+            ConfigObject configObject =
+                    
pluginConfig.getObject(FileBaseSourceOptions.TARGET_HADOOP_CONF.key());
+            Map<String, Object> raw = configObject.unwrapped();
+            Map<String, String> conf = new LinkedHashMap<>(raw.size());
+            raw.forEach((k, v) -> conf.put(k, v == null ? null : 
String.valueOf(v)));
+            targetHadoopConf = conf;
+        }
+    }
+
+    private void initTargetHadoopFileSystemProxy() {
+        HadoopConf targetConf = buildTargetHadoopConf();
+        if (targetConf == this.hadoopConf) {
+            targetHadoopFileSystemProxy = this.hadoopFileSystemProxy;
+            shareTargetFileSystemProxy = true;
+        } else {
+            targetHadoopFileSystemProxy = new 
HadoopFileSystemProxy(targetConf);
+            shareTargetFileSystemProxy = false;
+        }
+    }
+
+    private HadoopConf buildTargetHadoopConf() {
+        if (!enableUpdateSync) {
+            return this.hadoopConf;
+        }
+        Map<String, String> extraOptions =
+                targetHadoopConf == null
+                        ? new LinkedHashMap<>()
+                        : new LinkedHashMap<>(targetHadoopConf);
+
+        String fsDefaultNameKey = hadoopConf.getFsDefaultNameKey();
+        String targetDefaultFs = extraOptions.remove(fsDefaultNameKey);
+
+        if (StringUtils.isBlank(targetDefaultFs)) {
+            targetDefaultFs = tryDeriveDefaultFsFromPath(targetPath);
+        }
+        if (StringUtils.isBlank(targetDefaultFs)) {
+            targetDefaultFs = hadoopConf.getHdfsNameKey();
+        }
+
+        boolean needNewConf =
+                !extraOptions.isEmpty()
+                        || !Objects.equals(targetDefaultFs, 
hadoopConf.getHdfsNameKey());
+        if (!needNewConf) {
+            return this.hadoopConf;
+        }
+
+        HadoopConf conf = new HadoopConf(targetDefaultFs);
+        conf.setHdfsSitePath(hadoopConf.getHdfsSitePath());
+        conf.setRemoteUser(hadoopConf.getRemoteUser());
+        conf.setKrb5Path(hadoopConf.getKrb5Path());
+        conf.setKerberosPrincipal(hadoopConf.getKerberosPrincipal());
+        conf.setKerberosKeytabPath(hadoopConf.getKerberosKeytabPath());
+        conf.setExtraOptions(extraOptions);
+        return conf;
+    }
+
+    private static String tryDeriveDefaultFsFromPath(String basePath) {
+        if (StringUtils.isBlank(basePath)) {
+            return null;
+        }
+        try {
+            Path path = new Path(basePath);
+            if (path.toUri().getScheme() == null) {
+                return null;
+            }
+            if (path.toUri().getAuthority() == null) {
+                return null;
+            }
+            return path.toUri().getScheme() + "://" + 
path.toUri().getAuthority();
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private boolean shouldSyncFileInUpdateMode(FileStatus sourceFileStatus) 
throws IOException {
+        if (!enableUpdateSync) {
+            return true;
+        }
+        if (targetHadoopFileSystemProxy == null) {
+            initTargetHadoopFileSystemProxy();
+        }
+        String sourceFilePath = sourceFileStatus.getPath().toString();
+        String relativePath = resolveRelativePath(sourceRootPath, 
sourceFilePath);
+        String targetFilePath = buildTargetFilePath(targetPath, relativePath);
+
+        FileStatus targetFileStatus;
+        try {
+            targetFileStatus = 
targetHadoopFileSystemProxy.getFileStatus(targetFilePath);
+        } catch (FileNotFoundException e) {
+            return true;
+        }
+
+        long sourceLen = sourceFileStatus.getLen();
+        long targetLen = targetFileStatus.getLen();
+        if (sourceLen != targetLen) {
+            return true;
+        }
+
+        long sourceMtime = sourceFileStatus.getModificationTime();
+        long targetMtime = targetFileStatus.getModificationTime();
+
+        if (updateStrategy == FileUpdateStrategy.DISTCP) {
+            return sourceMtime > targetMtime;
+        }
+
+        if (updateStrategy == FileUpdateStrategy.STRICT) {
+            if (compareMode == FileCompareMode.LEN_MTIME) {
+                return sourceMtime != targetMtime;
+            }
+            if (compareMode == FileCompareMode.CHECKSUM) {
+                FileChecksum sourceChecksum = 
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
+                FileChecksum targetChecksum =
+                        
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
+                if (sourceChecksum == null || targetChecksum == null) {
+                    if (!checksumUnavailableWarned) {
+                        log.warn(
+                                "File checksum is not available, fallback to 
content comparison. source={}, target={}",
+                                sourceFilePath,
+                                targetFilePath);
+                        checksumUnavailableWarned = true;
+                    }
+                    try {
+                        return !fileContentEquals(sourceFilePath, 
targetFilePath);
+                    } catch (Exception e) {
+                        log.warn(
+                                "Fallback content comparison failed, fallback 
to COPY. source={}, target={}",
+                                sourceFilePath,
+                                targetFilePath,
+                                e);
+                        return true;
+                    }
+                }
+                return !checksumEquals(sourceChecksum, targetChecksum);
+            }
+        }
+
+        return true;
+    }
+
+    private static boolean checksumEquals(FileChecksum source, FileChecksum 
target) {
+        if (source == null || target == null) {
+            return false;
+        }
+        return Objects.equals(source.getAlgorithmName(), 
target.getAlgorithmName())
+                && source.getLength() == target.getLength()
+                && Arrays.equals(source.getBytes(), target.getBytes());
+    }
+
+    private boolean fileContentEquals(String sourceFilePath, String 
targetFilePath)
+            throws IOException {
+        try (InputStream sourceIn = 
hadoopFileSystemProxy.getInputStream(sourceFilePath);
+                InputStream targetIn = 
targetHadoopFileSystemProxy.getInputStream(targetFilePath)) {
+            byte[] sourceBuffer = new byte[8 * 1024];
+            byte[] targetBuffer = new byte[8 * 1024];
+
+            while (true) {
+                int sourceRead = sourceIn.read(sourceBuffer);
+                int targetRead = targetIn.read(targetBuffer);
+                if (sourceRead != targetRead) {
+                    return false;
+                }
+                if (sourceRead == -1) {
+                    return true;
+                }
+                for (int i = 0; i < sourceRead; i++) {
+                    if (sourceBuffer[i] != targetBuffer[i]) {
+                        return false;
+                    }
+                }
+            }
+        }
+    }
+
+    private static String buildTargetFilePath(String targetBasePath, String 
relativePath) {
+        String cleanRelativePath =
+                StringUtils.isBlank(relativePath)
+                        ? ""
+                        : (relativePath.startsWith("/") ? 
relativePath.substring(1) : relativePath);
+        return new Path(targetBasePath, cleanRelativePath).toString();
+    }
+
+    private static String resolveRelativePath(String basePath, String 
fullFilePath) {
+        String base = normalizePathPart(basePath);
+        String file = normalizePathPart(fullFilePath);
+        if (StringUtils.isBlank(file)) {
+            return "";
+        }
+        if (StringUtils.isBlank(base)) {
+            return new Path(file).getName();
+        }
+        if (Objects.equals(base, file)) {
+            return new Path(file).getName();
+        }
+        String basePrefix = base.endsWith("/") ? base : base + "/";
+        if (file.startsWith(basePrefix)) {
+            return file.substring(basePrefix.length());
+        }
+        int idx = file.indexOf(basePrefix);
+        if (idx >= 0) {
+            return file.substring(idx + basePrefix.length());
+        }
+        return new Path(file).getName();
+    }
+
+    private static String normalizePathPart(String path) {
+        if (StringUtils.isBlank(path)) {
+            return path;
+        }
+        try {
+            return new Path(path).toUri().getPath();
+        } catch (Exception e) {
+            return path;
+        }
+    }
+
+    private static <E extends Enum<E>> E parseEnumValue(
+            Class<E> enumClass, String rawValue, String optionKey) {
+        if (StringUtils.isBlank(rawValue)) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "Option '" + optionKey + "' must not be blank.");
+        }
+        String normalized = rawValue.trim().toUpperCase(Locale.ROOT);
+        for (E v : enumClass.getEnumConstants()) {
+            if (v.name().equalsIgnoreCase(normalized)) {
+                return v;
+            }
+        }
+        String supported =
+                Arrays.stream(enumClass.getEnumConstants())
+                        .map(e -> e.name().toLowerCase(Locale.ROOT))
+                        .reduce((a, b) -> a + ", " + b)
+                        .orElse("");
+        throw new FileConnectorException(
+                SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                "Unsupported " + optionKey + ": [" + rawValue + "], supported: 
" + supported + ".");
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
new file mode 100644
index 0000000000..9455eeef6f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+@DisabledOnOs(
+        value = OS.WINDOWS,
+        disabledReason =
+                "Hadoop has windows problem, please refer 
https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems";)
+class UpdateSyncModeTest {
+
+    @TempDir Path tempDir;
+
+    @Test
+    void testDistcpDoesNotSupportChecksumCompareMode() throws Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            Assertions.assertThrows(
+                    FileConnectorException.class,
+                    () ->
+                            strategy.setPluginConfig(
+                                    updateConfig(
+                                            sourceDir.toUri().toString(),
+                                            targetDir.toUri().toString(),
+                                            "distcp",
+                                            "checksum")));
+        }
+    }
+
+    @Test
+    void testUpdateModeOnlySupportsBinaryFormat() throws Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("path", sourceDir.toUri().toString());
+        configMap.put("file_format_type", "text");
+        configMap.put("sync_mode", "update");
+        configMap.put("target_path", targetDir.toUri().toString());
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            Assertions.assertThrows(
+                    FileConnectorException.class,
+                    () -> 
strategy.setPluginConfig(ConfigFactory.parseMap(configMap)));
+        }
+    }
+
+    @Test
+    void testUpdateModeRequiresTargetPath() throws Exception {
+        Path sourceDir = tempDir.resolve("src");
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("path", sourceDir.toUri().toString());
+        configMap.put("file_format_type", "binary");
+        configMap.put("sync_mode", "update");
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            Assertions.assertThrows(
+                    FileConnectorException.class,
+                    () -> 
strategy.setPluginConfig(ConfigFactory.parseMap(configMap)));
+        }
+    }
+
+    @Test
+    void testDistcpSkipWhenTargetNewerAndSameLength() throws Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+        Path sourceFile = sourceDir.resolve("a/b/test.bin");
+        Path targetFile = targetDir.resolve("a/b/test.bin");
+
+        writeFile(sourceFile, "abc".getBytes());
+        writeFile(targetFile, "abc".getBytes());
+        setMtime(sourceFile, 1_000);
+        setMtime(targetFile, 2_000);
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            strategy.setPluginConfig(
+                    updateConfig(
+                            sourceDir.toUri().toString(),
+                            targetDir.toUri().toString(),
+                            "distcp",
+                            "len_mtime"));
+            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+            List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+            Assertions.assertTrue(files.isEmpty(), "Target is newer with same 
len -> SKIP");
+        }
+    }
+
+    @Test
+    void testDistcpCopyWhenSourceNewer() throws Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+        Path sourceFile = sourceDir.resolve("test.bin");
+        Path targetFile = targetDir.resolve("test.bin");
+
+        writeFile(sourceFile, "abc".getBytes());
+        writeFile(targetFile, "abc".getBytes());
+        setMtime(sourceFile, 2_000);
+        setMtime(targetFile, 1_000);
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            strategy.setPluginConfig(
+                    updateConfig(
+                            sourceDir.toUri().toString(),
+                            targetDir.toUri().toString(),
+                            "distcp",
+                            "len_mtime"));
+            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+            List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+            Assertions.assertEquals(1, files.size());
+            Assertions.assertTrue(files.get(0).endsWith("/test.bin"));
+        }
+    }
+
+    @Test
+    void testStrictChecksumSkipWhenSameContentEvenIfMtimeDiff() throws 
Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+        Path sourceFile = sourceDir.resolve("test.bin");
+        Path targetFile = targetDir.resolve("test.bin");
+
+        writeFile(sourceFile, "abc".getBytes());
+        writeFile(targetFile, "abc".getBytes());
+        setMtime(sourceFile, 1_000);
+        setMtime(targetFile, 2_000);
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            strategy.setPluginConfig(
+                    updateConfig(
+                            sourceDir.toUri().toString(),
+                            targetDir.toUri().toString(),
+                            "strict",
+                            "checksum"));
+            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+            List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+            Assertions.assertTrue(files.isEmpty(), "Checksum equal -> SKIP");
+        }
+    }
+
+    @Test
+    void testStrictChecksumCopyWhenSameLengthButDifferentContent() throws 
Exception {
+        Path sourceDir = tempDir.resolve("src");
+        Path targetDir = tempDir.resolve("dst");
+        Path sourceFile = sourceDir.resolve("test.bin");
+        Path targetFile = targetDir.resolve("test.bin");
+
+        writeFile(sourceFile, "abc".getBytes());
+        writeFile(targetFile, "abd".getBytes());
+
+        try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+            strategy.setPluginConfig(
+                    updateConfig(
+                            sourceDir.toUri().toString(),
+                            targetDir.toUri().toString(),
+                            "strict",
+                            "checksum"));
+            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+            List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+            Assertions.assertEquals(1, files.size());
+            Assertions.assertTrue(files.get(0).endsWith("/test.bin"));
+        }
+    }
+
+    private static void writeFile(Path path, byte[] content) throws 
IOException {
+        Files.createDirectories(path.getParent());
+        Files.write(path, content);
+    }
+
+    private static void setMtime(Path path, long millis) throws IOException {
+        Files.setLastModifiedTime(path, FileTime.fromMillis(millis));
+    }
+
+    private static Config updateConfig(
+            String sourcePath, String targetPath, String updateStrategy, 
String compareMode) {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("path", sourcePath);
+        configMap.put("file_format_type", "binary");
+        configMap.put("sync_mode", "update");
+        configMap.put("target_path", targetPath);
+        configMap.put("update_strategy", updateStrategy);
+        configMap.put("compare_mode", compareMode);
+        return ConfigFactory.parseMap(configMap);
+    }
+
+    static class LocalConf extends HadoopConf {
+        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+        private static final String SCHEMA = "file";
+
+        public LocalConf(String hdfsNameKey) {
+            super(hdfsNameKey);
+        }
+
+        @Override
+        public String getFsHdfsImpl() {
+            return HDFS_IMPL;
+        }
+
+        @Override
+        public String getSchema() {
+            return SCHEMA;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 7e0de70c6e..6f8211873a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
 
@@ -94,6 +95,15 @@ public class HdfsFileSourceFactory implements 
TableSourceFactory {
                 .optional(FileBaseSourceOptions.NULL_FORMAT)
                 .optional(FileBaseSourceOptions.FILENAME_EXTENSION)
                 .optional(FileBaseSourceOptions.READ_COLUMNS)
+                .optional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileBaseSourceOptions.TARGET_HADOOP_CONF,
+                        FileBaseSourceOptions.UPDATE_STRATEGY,
+                        FileBaseSourceOptions.COMPARE_MODE)
+                .conditional(
+                        FileBaseSourceOptions.SYNC_MODE,
+                        FileSyncMode.UPDATE,
+                        FileBaseSourceOptions.TARGET_PATH)
                 .optional(FileBaseSourceOptions.HDFS_SITE_PATH)
                 .optional(FileBaseSourceOptions.KERBEROS_PRINCIPAL)
                 .optional(FileBaseSourceOptions.KERBEROS_KEYTAB_PATH)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
index 024eaaebd8..7920765322 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
@@ -26,10 +26,12 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSourceFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
 import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils;
@@ -50,10 +52,14 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -170,6 +176,42 @@ class HdfsFileSourceConfigTest {
         Assertions.assertEquals("hdfs_multi_source_read4", 
seaTunnelRows.get(3).getField(1));
     }
 
+    @Test
+    void testUpdateModeDistcpSkipStillProducesBinarySchema(@TempDir 
java.nio.file.Path tempDir)
+            throws IOException {
+        java.nio.file.Path sourceDir = tempDir.resolve("src");
+        java.nio.file.Path targetDir = tempDir.resolve("dst");
+        Files.createDirectories(sourceDir);
+        Files.createDirectories(targetDir);
+
+        java.nio.file.Path sourceFile = sourceDir.resolve("test.bin");
+        java.nio.file.Path targetFile = targetDir.resolve("test.bin");
+        Files.write(sourceFile, "abc".getBytes(StandardCharsets.UTF_8));
+        Files.write(targetFile, "abc".getBytes(StandardCharsets.UTF_8));
+        Files.setLastModifiedTime(sourceFile, FileTime.fromMillis(1_000));
+        Files.setLastModifiedTime(targetFile, FileTime.fromMillis(2_000));
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HdfsSourceConfigOptions.FILE_PATH.key(), 
sourceDir.toString());
+        configMap.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(), 
"binary");
+        configMap.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+        configMap.put(FileBaseSourceOptions.SYNC_MODE.key(), "update");
+        configMap.put(FileBaseSourceOptions.TARGET_PATH.key(), 
targetDir.toString());
+        configMap.put(FileBaseSourceOptions.UPDATE_STRATEGY.key(), "distcp");
+        configMap.put(FileBaseSourceOptions.COMPARE_MODE.key(), "len_mtime");
+
+        Config config = ConfigFactory.parseMap(configMap);
+        HdfsFileSourceConfig sourceConfig =
+                new HdfsFileSourceConfig(ReadonlyConfig.fromConfig(config));
+
+        Assertions.assertTrue(
+                sourceConfig.getFilePaths().isEmpty(),
+                "Update+distcp should filter files when target is newer and 
same length");
+        Assertions.assertEquals(
+                BinaryReadStrategy.binaryRowType,
+                sourceConfig.getCatalogTable().getSeaTunnelRowType());
+    }
+
     @AfterEach
     public void clear() throws IOException {
         deleteFile(DATA_FILE_PATH1);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index 40de8f03a2..fde61f1f3a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -128,4 +128,72 @@ public class HdfsFileIT extends TestSuiteBase implements 
TestResource {
                 container.executeJob("/hdfs_normal_to_assert.conf");
         Assertions.assertEquals(0, readResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testHdfsBinaryUpdateModeDistcp(TestContainer container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putHdfsFile("/update/src/test.bin", "abc");
+
+        org.testcontainers.containers.Container.ExecResult firstRun =
+                container.executeJob("/hdfs_binary_update_distcp.conf");
+        Assertions.assertEquals(0, firstRun.getExitCode());
+        Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+
+        // Make target newer with same length, distcp strategy should SKIP 
overwrite.
+        putHdfsFile("/update/dst/test.bin", "zzz");
+        org.testcontainers.containers.Container.ExecResult secondRun =
+                container.executeJob("/hdfs_binary_update_distcp.conf");
+        Assertions.assertEquals(0, secondRun.getExitCode());
+        Assertions.assertEquals("zzz", readHdfsFile("/update/dst/test.bin"));
+
+        // Change source length, distcp strategy should COPY overwrite.
+        putHdfsFile("/update/src/test.bin", "abcd");
+        org.testcontainers.containers.Container.ExecResult thirdRun =
+                container.executeJob("/hdfs_binary_update_distcp.conf");
+        Assertions.assertEquals(0, thirdRun.getExitCode());
+        Assertions.assertEquals("abcd", readHdfsFile("/update/dst/test.bin"));
+    }
+
+    @TestTemplate
+    public void testHdfsBinaryUpdateModeStrictChecksum(TestContainer container)
+            throws IOException, InterruptedException {
+        resetUpdateTestPath();
+        putHdfsFile("/update/src/test.bin", "abc");
+
+        org.testcontainers.containers.Container.ExecResult firstRun =
+                
container.executeJob("/hdfs_binary_update_strict_checksum.conf");
+        Assertions.assertEquals(0, firstRun.getExitCode());
+        Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+
+        // Same length but different content, strict+checksum should COPY 
overwrite.
+        putHdfsFile("/update/dst/test.bin", "zzz");
+        org.testcontainers.containers.Container.ExecResult secondRun =
+                
container.executeJob("/hdfs_binary_update_strict_checksum.conf");
+        Assertions.assertEquals(0, secondRun.getExitCode());
+        Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+    }
+
+    private void resetUpdateTestPath() throws IOException, 
InterruptedException {
+        nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /update || 
true");
+        org.testcontainers.containers.Container.ExecResult mkdirResult =
+                nameNode.execInContainer(
+                        "hdfs", "dfs", "-mkdir", "-p", "/update/src", 
"/update/dst", "/update/tmp");
+        Assertions.assertEquals(0, mkdirResult.getExitCode());
+    }
+
+    private void putHdfsFile(String hdfsPath, String content)
+            throws IOException, InterruptedException {
+        String command = "printf '" + content + "' | hdfs dfs -put -f - " + 
hdfsPath;
+        org.testcontainers.containers.Container.ExecResult putResult =
+                nameNode.execInContainer("bash", "-c", command);
+        Assertions.assertEquals(0, putResult.getExitCode());
+    }
+
+    private String readHdfsFile(String hdfsPath) throws IOException, 
InterruptedException {
+        org.testcontainers.containers.Container.ExecResult catResult =
+                nameNode.execInContainer("hdfs", "dfs", "-cat", hdfsPath);
+        Assertions.assertEquals(0, catResult.getExitCode());
+        return catResult.getStdout() == null ? "" : 
catResult.getStdout().trim();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
new file mode 100644
index 0000000000..6dbdbdb999
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/update/dst"
+    update_strategy = "distcp"
+    compare_mode = "len_mtime"
+
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
+sink {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/update/dst"
+    tmp_path = "/update/tmp"
+    file_format_type = "binary"
+    data_save_mode = "APPEND_DATA"
+
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
new file mode 100644
index 0000000000..7dd180d43a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/update/src"
+    file_format_type = "binary"
+
+    sync_mode = "update"
+    target_path = "/update/dst"
+    update_strategy = "strict"
+    compare_mode = "checksum"
+
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
+sink {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/update/dst"
+    tmp_path = "/update/tmp"
+    file_format_type = "binary"
+    data_save_mode = "APPEND_DATA"
+
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+

Reply via email to