This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c134273bcf [Feature][Connector-File-Hadoop] Support sync_mode=update
for HdfsFile source (binary) (#10268)
c134273bcf is described below
commit c134273bcfba2ef6fe138abf79d92268e6acc81c
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Jan 11 20:52:33 2026 +0800
[Feature][Connector-File-Hadoop] Support sync_mode=update for HdfsFile
source (binary) (#10268)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/source/HdfsFile.md | 44 ++-
docs/zh/connector-v2/source/HdfsFile.md | 44 ++-
.../file/hdfs/source/BaseHdfsFileSource.java | 10 +-
.../file/config/BaseFileSourceConfig.java | 7 +
.../file/config/FileBaseSourceOptions.java | 47 +++
.../seatunnel/file/config/FileCompareMode.java | 25 ++
.../seatunnel/file/config/FileSyncMode.java | 25 ++
.../seatunnel/file/config/FileUpdateStrategy.java | 25 ++
.../file/hadoop/HadoopFileSystemProxy.java | 5 +
.../file/source/reader/AbstractReadStrategy.java | 354 ++++++++++++++++++++-
.../file/source/reader/UpdateSyncModeTest.java | 245 ++++++++++++++
.../file/hdfs/source/HdfsFileSourceFactory.java | 10 +
.../file/hdfs/HdfsFileSourceConfigTest.java | 42 +++
.../e2e/connector/file/hdfs/HdfsFileIT.java | 68 ++++
.../test/resources/hdfs_binary_update_distcp.conf | 53 +++
.../hdfs_binary_update_strict_checksum.conf | 53 +++
16 files changed, 1048 insertions(+), 9 deletions(-)
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index 2ebb6de0d4..dc9827004c 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -80,6 +80,11 @@ Read data from hdfs file system.
| null_format | string | no | -
| Only used when file_format_type is text. null_format to define which strings
can be represented as null. e.g: `\N`
|
| binary_chunk_size | int | no | 1024
| Only used when file_format_type is binary. The chunk size (in bytes) for
reading binary files. Default is 1024 bytes. Larger values may improve
performance for large files but use more memory.
|
| binary_complete_file_mode | boolean | no | false
| Only used when file_format_type is binary. Whether to read the complete file
as a single chunk instead of splitting into chunks. When enabled, the entire
file content will be read into memory at once. Default is false.
|
+| sync_mode | string | no | full
| File sync mode. Supported values: `full`, `update`. When `update`, the
source compares files between source/target and only reads new/changed files
(currently only supports `file_format_type=binary`).
|
+| target_path | string | no | -
| Only used when `sync_mode=update`. Target base path used for comparison (it
should usually be the same as sink `path`).
|
+| target_hadoop_conf | map | no | -
| Only used when `sync_mode=update`. Extra Hadoop configuration for target
filesystem. You can set `fs.defaultFS` in this map to override target
defaultFS.
|
+| update_strategy | string | no | distcp
| Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
|
+| compare_mode | string | no | len_mtime
| Only used when `sync_mode=update`. Supported values: `len_mtime` (default),
`checksum` (only valid when `update_strategy=strict`).
|
| common-options | | no | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
|
| file_filter_modified_start | string | no | -
| File modification time filter. The connector will filter some files base on
the last modification start time (include start time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
|
| file_filter_modified_end | string | no | -
| File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
|
@@ -212,6 +217,43 @@ Only used when file_format_type is binary.
Whether to read the complete file as a single chunk instead of splitting into
chunks. When enabled, the entire file content will be read into memory at once.
Default is false.
+### sync_mode [string]
+
+File sync mode. Supported values: `full` (default), `update`.
+
+When `sync_mode=update`, the source will compare files between source/target
and only read new/changed files (currently only supports
`file_format_type=binary`).
+
+### target_path [string]
+
+Only used when `sync_mode=update`.
+
+Target base path used for comparison (it should usually be the same as sink
`path`).
+
+### target_hadoop_conf [map]
+
+Only used when `sync_mode=update`.
+
+Extra Hadoop configuration for target filesystem (optional). If not set, it
reuses the source filesystem configuration.
+
+You can set `fs.defaultFS` in this map to override target defaultFS, e.g.
`"fs.defaultFS" = "hdfs://nn2:9000"`.
+
+### update_strategy [string]
+
+Only used when `sync_mode=update`. Supported values: `distcp` (default),
`strict`.
+
+- `distcp`: similar to `distcp -update`:
+ - target file not exists → COPY
+ - length differs → COPY
+ - `mtime(source) > mtime(target)` → COPY
+ - else → SKIP
+- `strict`: strict consistency, decided by `compare_mode`.
+
+### compare_mode [string]
+
+Only used when `sync_mode=update`. Supported values: `len_mtime` (default),
`checksum`.
+
+- `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
+- `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is
equal, otherwise COPY (only valid when `update_strategy=strict`).
### quote_char [string]
A single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
@@ -335,4 +377,4 @@ sink {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connector-v2/source/HdfsFile.md
b/docs/zh/connector-v2/source/HdfsFile.md
index 05b49ec67f..2fba533ec8 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -80,6 +80,11 @@ import ChangeLog from
'../changelog/connector-file-hadoop.md';
| null_format | string | 否 | - | 仅在
file_format_type 为 text 时使用。null_format 定义哪些字符串可以表示为 null。例如:`\N`
|
| binary_chunk_size | int | 否 | 1024 | 仅在
file_format_type 为 binary 时使用。读取二进制文件的块大小(以字节为单位)。默认为 1024
字节。较大的值可能会提高大文件的性能,但会使用更多内存。
|
| binary_complete_file_mode | boolean | 否 | false | 仅在
file_format_type 为 binary 时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为
false。
|
+| sync_mode | string | 否 | full |
文件同步模式,支持:`full`(默认)、`update`。当 `update` 时,对源/目标进行对比,只读取新增/变更文件(目前仅支持
`file_format_type=binary`)。
|
+| target_path | string | 否 | - | 仅在
`sync_mode=update` 时使用。目标端基础路径(通常应与 sink 的 `path` 一致),用于对比同相对路径文件。
|
+| target_hadoop_conf | map | 否 | - | 仅在
`sync_mode=update` 时使用。目标端 Hadoop 配置(可选),可在其中设置 `fs.defaultFS` 覆盖目标 defaultFS。
|
+| update_strategy | string | 否 | distcp | 仅在
`sync_mode=update` 时使用。支持:`distcp`(默认)、`strict`。
|
+| compare_mode | string | 否 | len_mtime | 仅在
`sync_mode=update` 时使用。支持:`len_mtime`(默认)、`checksum`(仅在
`update_strategy=strict` 时可用)。
|
| common-options | | 否 | - |
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。
|
| file_filter_modified_start | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
| file_filter_modified_end | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
@@ -213,6 +218,43 @@ abc.*
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
+### sync_mode [string]
+
+文件同步模式,支持:`full`(默认)`update`。
+
+当 `sync_mode=update` 时,会在读取端对源/目标进行对比,只读取新增/变更文件(目前仅支持
`file_format_type=binary`)。
+
+### target_path [string]
+
+仅在 `sync_mode=update` 时使用。
+
+目标端基础路径(通常应与 sink 的 `path` 保持一致),用于对比同相对路径的目标文件是否存在/是否需要更新。
+
+### target_hadoop_conf [map]
+
+仅在 `sync_mode=update` 时使用。
+
+用于访问目标文件系统的 Hadoop 配置(可选)。当不配置时默认复用 source 端的文件系统配置。
+
+可在该 map 中指定 `fs.defaultFS` 来覆盖目标端 defaultFS,例如:`"fs.defaultFS" =
"hdfs://nn2:9000"`。
+
+### update_strategy [string]
+
+仅在 `sync_mode=update` 时使用。支持:`distcp`(默认)`strict`。
+
+- `distcp`:更接近 `distcp -update` 的语义:
+ - 目标文件不存在 → COPY
+ - 长度不同 → COPY
+ - `mtime(source) > mtime(target)` → COPY
+ - 否则 → SKIP
+- `strict`:严格一致性,配合 `compare_mode` 判断是否 SKIP。
+
+### compare_mode [string]
+
+仅在 `sync_mode=update` 时使用。支持:`len_mtime`(默认)`checksum`。
+
+- `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
+- `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在
`update_strategy=strict` 时生效)。
### quote_char [string]
用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
@@ -336,4 +378,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 7e93e39cec..1a473b2bf3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -136,8 +136,14 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
}
} else {
if (filePaths.isEmpty()) {
- // When the directory is empty, distribute default behavior
schema
- rowType = CatalogTableUtil.buildSimpleTextSchema();
+ // When there are no files (including sync_mode=update
filtered all files), choose a
+ // compatible schema so that downstream can initialize
correctly.
+ if (fileFormat == FileFormat.BINARY) {
+ rowType = readStrategy.getSeaTunnelRowTypeInfo(path);
+ } else {
+ // fallback schema when schema cannot be inferred from
files
+ rowType = CatalogTableUtil.buildSimpleTextSchema();
+ }
return;
}
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index 56f8b234cb..b1bb7b0fe2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -88,6 +88,13 @@ public abstract class BaseFileSourceConfig implements
Serializable {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
if (CollectionUtils.isEmpty(filePaths)) {
+ // When there are no files (including sync_mode=update filtered
all files), choose a
+ // compatible schema so that downstream can initialize correctly.
+ if (fileFormat == FileFormat.BINARY) {
+ String rootPath =
readonlyConfig.get(FileBaseSourceOptions.FILE_PATH);
+ return newCatalogTable(
+ catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(rootPath));
+ }
return catalogTable;
}
switch (fileFormat) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
index a103220c99..67adcc56c6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
public class FileBaseSourceOptions extends FileBaseOptions {
public static final String DEFAULT_ROW_DELIMITER = "\n";
@@ -128,6 +130,51 @@ public class FileBaseSourceOptions extends FileBaseOptions
{
"Whether to read the complete file as a single
chunk instead of splitting into chunks. "
+ "When enabled, the entire file content
will be read into memory at once.Only valid when file_format_type is binary.");
+ public static final Option<FileSyncMode> SYNC_MODE =
+ Options.key("sync_mode")
+ .singleChoice(
+ FileSyncMode.class,
+ Arrays.asList(FileSyncMode.FULL,
FileSyncMode.UPDATE))
+ .defaultValue(FileSyncMode.FULL)
+ .withDescription(
+ "File sync mode. Supported values: full, update. "
+ + "When set to update, the source will
compare with target and only read new/changed files. "
+ + "Currently, update mode only supports
file_format_type=binary.");
+
+ public static final Option<String> TARGET_PATH =
+ Options.key("target_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target base path for sync_mode=update
comparison.");
+
+ public static final Option<Map<String, String>> TARGET_HADOOP_CONF =
+ Options.key("target_hadoop_conf")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Extra Hadoop configuration for target filesystem
in sync_mode=update. "
+ + "Use key 'fs.defaultFS' to override
target defaultFS if needed.");
+
+ public static final Option<FileUpdateStrategy> UPDATE_STRATEGY =
+ Options.key("update_strategy")
+ .singleChoice(
+ FileUpdateStrategy.class,
+ Arrays.asList(FileUpdateStrategy.DISTCP,
FileUpdateStrategy.STRICT))
+ .defaultValue(FileUpdateStrategy.DISTCP)
+ .withDescription(
+ "Update strategy when sync_mode=update. Supported
values: distcp, strict. "
+ + "distcp behaves like 'distcp -update'
(len+mtime, and does not require equal mtime). "
+ + "strict requires exact consistency
depending on compare_mode.");
+
+ public static final Option<FileCompareMode> COMPARE_MODE =
+ Options.key("compare_mode")
+ .singleChoice(
+ FileCompareMode.class,
+ Arrays.asList(FileCompareMode.LEN_MTIME,
FileCompareMode.CHECKSUM))
+ .defaultValue(FileCompareMode.LEN_MTIME)
+ .withDescription(
+ "Compare mode when sync_mode=update. Supported
values: len_mtime, checksum. "
+ + "checksum uses Hadoop
FileSystem#getFileChecksum, only valid when update_strategy=strict.");
public static final Option<String> QUOTE_CHAR =
Options.key("quote_char")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
new file mode 100644
index 0000000000..0d398866a0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileCompareMode.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileCompareMode implements Serializable {
+ LEN_MTIME,
+ CHECKSUM
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
new file mode 100644
index 0000000000..a4941101bf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSyncMode.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileSyncMode implements Serializable {
+ FULL,
+ UPDATE
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
new file mode 100644
index 0000000000..1acf1d3634
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileUpdateStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import java.io.Serializable;
+
+public enum FileUpdateStrategy implements Serializable {
+ DISTCP,
+ STRICT
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index 0c62d26bd4..cdee0214ec 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -188,6 +189,10 @@ public class HadoopFileSystemProxy implements
Serializable, Closeable {
return execute(() -> getFileSystem().getFileStatus(new
Path(filePath)));
}
+ public FileChecksum getFileChecksum(String filePath) throws IOException {
+ return execute(() -> getFileSystem().getFileChecksum(new
Path(filePath)));
+ }
+
public FSDataOutputStream getOutputStream(String filePath) throws
IOException {
return execute(() -> getFileSystem().create(new Path(filePath), true));
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index c0e8d2a90f..e55edc13f2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -18,8 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -30,8 +33,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileCompareMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileUpdateStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
@@ -40,12 +47,15 @@ import
org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
@@ -58,6 +68,7 @@ import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
@@ -101,10 +112,24 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected boolean enableSplitFile;
+ protected String sourceRootPath;
+ protected boolean enableUpdateSync;
+ protected String targetPath;
+ protected FileUpdateStrategy updateStrategy =
+ FileBaseSourceOptions.UPDATE_STRATEGY.defaultValue();
+ protected FileCompareMode compareMode =
FileBaseSourceOptions.COMPARE_MODE.defaultValue();
+ protected Map<String, String> targetHadoopConf;
+ protected transient HadoopFileSystemProxy targetHadoopFileSystemProxy;
+ protected transient boolean shareTargetFileSystemProxy;
+ protected transient boolean checksumUnavailableWarned;
+
@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
+ if (enableUpdateSync) {
+ initTargetHadoopFileSystemProxy();
+ }
}
@Override
@@ -140,14 +165,18 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
if (!readPartitions.isEmpty()) {
for (String readPartition : readPartitions) {
if (filePath.contains(readPartition)) {
- fileNames.add(filePath);
- this.fileNames.add(filePath);
+ if (shouldSyncFileInUpdateMode(fileStatus)) {
+ fileNames.add(filePath);
+ this.fileNames.add(filePath);
+ }
break;
}
}
} else {
- fileNames.add(filePath);
- this.fileNames.add(filePath);
+ if (shouldSyncFileInUpdateMode(fileStatus)) {
+ fileNames.add(filePath);
+ this.fileNames.add(filePath);
+ }
}
}
}
@@ -234,7 +263,9 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
this.pattern = Pattern.compile(filterPattern);
// because 'ConfigFactory.systemProperties()' has a 'path'
parameter, it is necessary to
// obtain 'path' under the premise of 'FILE_FILTER_PATTERN'
- if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())) {
+ if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
+ &&
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
+ == ConfigValueType.STRING) {
fileBasePath =
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
}
}
@@ -254,6 +285,25 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
enableSplitFile =
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
}
+
+ if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
+ &&
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
+ == ConfigValueType.STRING) {
+ sourceRootPath =
pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
+ }
+
+ FileSyncMode syncMode = FileBaseSourceOptions.SYNC_MODE.defaultValue();
+ if (pluginConfig.hasPath(FileBaseSourceOptions.SYNC_MODE.key())) {
+ syncMode =
+ parseEnumValue(
+ FileSyncMode.class,
+
pluginConfig.getString(FileBaseSourceOptions.SYNC_MODE.key()),
+ FileBaseSourceOptions.SYNC_MODE.key());
+ }
+ enableUpdateSync = syncMode == FileSyncMode.UPDATE;
+ if (enableUpdateSync) {
+ validateUpdateSyncConfig(pluginConfig);
+ }
}
@Override
@@ -473,10 +523,304 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
@Override
public void close() throws IOException {
try {
+ if (targetHadoopFileSystemProxy != null &&
!shareTargetFileSystemProxy) {
+ targetHadoopFileSystemProxy.close();
+ }
if (hadoopFileSystemProxy != null) {
hadoopFileSystemProxy.close();
}
} catch (Exception ignore) {
}
}
+
+ private void validateUpdateSyncConfig(Config pluginConfig) {
+ if
(!pluginConfig.hasPath(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "When sync_mode=update, file_format_type must be set.");
+ }
+ FileFormat fileFormat =
+ FileFormat.valueOf(
+ pluginConfig
+
.getString(FileBaseSourceOptions.FILE_FORMAT_TYPE.key())
+ .toUpperCase());
+ if (fileFormat != FileFormat.BINARY) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "sync_mode=update currently only supports
file_format_type=binary.");
+ }
+
+ if (!pluginConfig.hasPath(FileBaseSourceOptions.TARGET_PATH.key())
+ || StringUtils.isBlank(
+
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()))) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "When sync_mode=update, target_path must be set.");
+ }
+ targetPath =
pluginConfig.getString(FileBaseSourceOptions.TARGET_PATH.key()).trim();
+
+ updateStrategy = FileBaseSourceOptions.UPDATE_STRATEGY.defaultValue();
+ if (pluginConfig.hasPath(FileBaseSourceOptions.UPDATE_STRATEGY.key()))
{
+ updateStrategy =
+ parseEnumValue(
+ FileUpdateStrategy.class,
+
pluginConfig.getString(FileBaseSourceOptions.UPDATE_STRATEGY.key()),
+ FileBaseSourceOptions.UPDATE_STRATEGY.key());
+ }
+
+ compareMode = FileBaseSourceOptions.COMPARE_MODE.defaultValue();
+ if (pluginConfig.hasPath(FileBaseSourceOptions.COMPARE_MODE.key())) {
+ compareMode =
+ parseEnumValue(
+ FileCompareMode.class,
+
pluginConfig.getString(FileBaseSourceOptions.COMPARE_MODE.key()),
+ FileBaseSourceOptions.COMPARE_MODE.key());
+ }
+ if (updateStrategy == FileUpdateStrategy.DISTCP
+ && compareMode != FileCompareMode.LEN_MTIME) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "compare_mode="
+ + compareMode.name().toLowerCase(Locale.ROOT)
+ + " is not supported when
update_strategy=distcp.");
+ }
+
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.TARGET_HADOOP_CONF.key())) {
+ ConfigObject configObject =
+
pluginConfig.getObject(FileBaseSourceOptions.TARGET_HADOOP_CONF.key());
+ Map<String, Object> raw = configObject.unwrapped();
+ Map<String, String> conf = new LinkedHashMap<>(raw.size());
+ raw.forEach((k, v) -> conf.put(k, v == null ? null :
String.valueOf(v)));
+ targetHadoopConf = conf;
+ }
+ }
+
+ private void initTargetHadoopFileSystemProxy() {
+ HadoopConf targetConf = buildTargetHadoopConf();
+ if (targetConf == this.hadoopConf) {
+ targetHadoopFileSystemProxy = this.hadoopFileSystemProxy;
+ shareTargetFileSystemProxy = true;
+ } else {
+ targetHadoopFileSystemProxy = new
HadoopFileSystemProxy(targetConf);
+ shareTargetFileSystemProxy = false;
+ }
+ }
+
+ private HadoopConf buildTargetHadoopConf() {
+ if (!enableUpdateSync) {
+ return this.hadoopConf;
+ }
+ Map<String, String> extraOptions =
+ targetHadoopConf == null
+ ? new LinkedHashMap<>()
+ : new LinkedHashMap<>(targetHadoopConf);
+
+ String fsDefaultNameKey = hadoopConf.getFsDefaultNameKey();
+ String targetDefaultFs = extraOptions.remove(fsDefaultNameKey);
+
+ if (StringUtils.isBlank(targetDefaultFs)) {
+ targetDefaultFs = tryDeriveDefaultFsFromPath(targetPath);
+ }
+ if (StringUtils.isBlank(targetDefaultFs)) {
+ targetDefaultFs = hadoopConf.getHdfsNameKey();
+ }
+
+ boolean needNewConf =
+ !extraOptions.isEmpty()
+ || !Objects.equals(targetDefaultFs,
hadoopConf.getHdfsNameKey());
+ if (!needNewConf) {
+ return this.hadoopConf;
+ }
+
+ HadoopConf conf = new HadoopConf(targetDefaultFs);
+ conf.setHdfsSitePath(hadoopConf.getHdfsSitePath());
+ conf.setRemoteUser(hadoopConf.getRemoteUser());
+ conf.setKrb5Path(hadoopConf.getKrb5Path());
+ conf.setKerberosPrincipal(hadoopConf.getKerberosPrincipal());
+ conf.setKerberosKeytabPath(hadoopConf.getKerberosKeytabPath());
+ conf.setExtraOptions(extraOptions);
+ return conf;
+ }
+
+ private static String tryDeriveDefaultFsFromPath(String basePath) {
+ if (StringUtils.isBlank(basePath)) {
+ return null;
+ }
+ try {
+ Path path = new Path(basePath);
+ if (path.toUri().getScheme() == null) {
+ return null;
+ }
+ if (path.toUri().getAuthority() == null) {
+ return null;
+ }
+ return path.toUri().getScheme() + "://" +
path.toUri().getAuthority();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private boolean shouldSyncFileInUpdateMode(FileStatus sourceFileStatus)
throws IOException {
+ if (!enableUpdateSync) {
+ return true;
+ }
+ if (targetHadoopFileSystemProxy == null) {
+ initTargetHadoopFileSystemProxy();
+ }
+ String sourceFilePath = sourceFileStatus.getPath().toString();
+ String relativePath = resolveRelativePath(sourceRootPath,
sourceFilePath);
+ String targetFilePath = buildTargetFilePath(targetPath, relativePath);
+
+ FileStatus targetFileStatus;
+ try {
+ targetFileStatus =
targetHadoopFileSystemProxy.getFileStatus(targetFilePath);
+ } catch (FileNotFoundException e) {
+ return true;
+ }
+
+ long sourceLen = sourceFileStatus.getLen();
+ long targetLen = targetFileStatus.getLen();
+ if (sourceLen != targetLen) {
+ return true;
+ }
+
+ long sourceMtime = sourceFileStatus.getModificationTime();
+ long targetMtime = targetFileStatus.getModificationTime();
+
+ if (updateStrategy == FileUpdateStrategy.DISTCP) {
+ return sourceMtime > targetMtime;
+ }
+
+ if (updateStrategy == FileUpdateStrategy.STRICT) {
+ if (compareMode == FileCompareMode.LEN_MTIME) {
+ return sourceMtime != targetMtime;
+ }
+ if (compareMode == FileCompareMode.CHECKSUM) {
+ FileChecksum sourceChecksum =
hadoopFileSystemProxy.getFileChecksum(sourceFilePath);
+ FileChecksum targetChecksum =
+
targetHadoopFileSystemProxy.getFileChecksum(targetFilePath);
+ if (sourceChecksum == null || targetChecksum == null) {
+ if (!checksumUnavailableWarned) {
+ log.warn(
+ "File checksum is not available, fallback to
content comparison. source={}, target={}",
+ sourceFilePath,
+ targetFilePath);
+ checksumUnavailableWarned = true;
+ }
+ try {
+ return !fileContentEquals(sourceFilePath,
targetFilePath);
+ } catch (Exception e) {
+ log.warn(
+ "Fallback content comparison failed, fallback
to COPY. source={}, target={}",
+ sourceFilePath,
+ targetFilePath,
+ e);
+ return true;
+ }
+ }
+ return !checksumEquals(sourceChecksum, targetChecksum);
+ }
+ }
+
+ return true;
+ }
+
+ private static boolean checksumEquals(FileChecksum source, FileChecksum
target) {
+ if (source == null || target == null) {
+ return false;
+ }
+ return Objects.equals(source.getAlgorithmName(),
target.getAlgorithmName())
+ && source.getLength() == target.getLength()
+ && Arrays.equals(source.getBytes(), target.getBytes());
+ }
+
+ private boolean fileContentEquals(String sourceFilePath, String
targetFilePath)
+ throws IOException {
+ try (InputStream sourceIn =
hadoopFileSystemProxy.getInputStream(sourceFilePath);
+ InputStream targetIn =
targetHadoopFileSystemProxy.getInputStream(targetFilePath)) {
+ byte[] sourceBuffer = new byte[8 * 1024];
+ byte[] targetBuffer = new byte[8 * 1024];
+
+ while (true) {
+ int sourceRead = sourceIn.read(sourceBuffer);
+ int targetRead = targetIn.read(targetBuffer);
+ if (sourceRead != targetRead) {
+ return false;
+ }
+ if (sourceRead == -1) {
+ return true;
+ }
+ for (int i = 0; i < sourceRead; i++) {
+ if (sourceBuffer[i] != targetBuffer[i]) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ private static String buildTargetFilePath(String targetBasePath, String
relativePath) {
+ String cleanRelativePath =
+ StringUtils.isBlank(relativePath)
+ ? ""
+ : (relativePath.startsWith("/") ?
relativePath.substring(1) : relativePath);
+ return new Path(targetBasePath, cleanRelativePath).toString();
+ }
+
+ private static String resolveRelativePath(String basePath, String
fullFilePath) {
+ String base = normalizePathPart(basePath);
+ String file = normalizePathPart(fullFilePath);
+ if (StringUtils.isBlank(file)) {
+ return "";
+ }
+ if (StringUtils.isBlank(base)) {
+ return new Path(file).getName();
+ }
+ if (Objects.equals(base, file)) {
+ return new Path(file).getName();
+ }
+ String basePrefix = base.endsWith("/") ? base : base + "/";
+ if (file.startsWith(basePrefix)) {
+ return file.substring(basePrefix.length());
+ }
+ int idx = file.indexOf(basePrefix);
+ if (idx >= 0) {
+ return file.substring(idx + basePrefix.length());
+ }
+ return new Path(file).getName();
+ }
+
+ private static String normalizePathPart(String path) {
+ if (StringUtils.isBlank(path)) {
+ return path;
+ }
+ try {
+ return new Path(path).toUri().getPath();
+ } catch (Exception e) {
+ return path;
+ }
+ }
+
+ private static <E extends Enum<E>> E parseEnumValue(
+ Class<E> enumClass, String rawValue, String optionKey) {
+ if (StringUtils.isBlank(rawValue)) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "Option '" + optionKey + "' must not be blank.");
+ }
+ String normalized = rawValue.trim().toUpperCase(Locale.ROOT);
+ for (E v : enumClass.getEnumConstants()) {
+ if (v.name().equalsIgnoreCase(normalized)) {
+ return v;
+ }
+ }
+ String supported =
+ Arrays.stream(enumClass.getEnumConstants())
+ .map(e -> e.name().toLowerCase(Locale.ROOT))
+ .reduce((a, b) -> a + ", " + b)
+ .orElse("");
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ "Unsupported " + optionKey + ": [" + rawValue + "], supported:
" + supported + ".");
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
new file mode 100644
index 0000000000..9455eeef6f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+@DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason =
+ "Hadoop has windows problem, please refer
https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems")
+class UpdateSyncModeTest {
+
+ @TempDir Path tempDir;
+
+ @Test
+ void testDistcpDoesNotSupportChecksumCompareMode() throws Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ Assertions.assertThrows(
+ FileConnectorException.class,
+ () ->
+ strategy.setPluginConfig(
+ updateConfig(
+ sourceDir.toUri().toString(),
+ targetDir.toUri().toString(),
+ "distcp",
+ "checksum")));
+ }
+ }
+
+ @Test
+ void testUpdateModeOnlySupportsBinaryFormat() throws Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("path", sourceDir.toUri().toString());
+ configMap.put("file_format_type", "text");
+ configMap.put("sync_mode", "update");
+ configMap.put("target_path", targetDir.toUri().toString());
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ Assertions.assertThrows(
+ FileConnectorException.class,
+ () ->
strategy.setPluginConfig(ConfigFactory.parseMap(configMap)));
+ }
+ }
+
+ @Test
+ void testUpdateModeRequiresTargetPath() throws Exception {
+ Path sourceDir = tempDir.resolve("src");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("path", sourceDir.toUri().toString());
+ configMap.put("file_format_type", "binary");
+ configMap.put("sync_mode", "update");
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ Assertions.assertThrows(
+ FileConnectorException.class,
+ () ->
strategy.setPluginConfig(ConfigFactory.parseMap(configMap)));
+ }
+ }
+
+ @Test
+ void testDistcpSkipWhenTargetNewerAndSameLength() throws Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+ Path sourceFile = sourceDir.resolve("a/b/test.bin");
+ Path targetFile = targetDir.resolve("a/b/test.bin");
+
+ writeFile(sourceFile, "abc".getBytes());
+ writeFile(targetFile, "abc".getBytes());
+ setMtime(sourceFile, 1_000);
+ setMtime(targetFile, 2_000);
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ strategy.setPluginConfig(
+ updateConfig(
+ sourceDir.toUri().toString(),
+ targetDir.toUri().toString(),
+ "distcp",
+ "len_mtime"));
+ strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+ List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+ Assertions.assertTrue(files.isEmpty(), "Target is newer with same
len -> SKIP");
+ }
+ }
+
+ @Test
+ void testDistcpCopyWhenSourceNewer() throws Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+ Path sourceFile = sourceDir.resolve("test.bin");
+ Path targetFile = targetDir.resolve("test.bin");
+
+ writeFile(sourceFile, "abc".getBytes());
+ writeFile(targetFile, "abc".getBytes());
+ setMtime(sourceFile, 2_000);
+ setMtime(targetFile, 1_000);
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ strategy.setPluginConfig(
+ updateConfig(
+ sourceDir.toUri().toString(),
+ targetDir.toUri().toString(),
+ "distcp",
+ "len_mtime"));
+ strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+ List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+ Assertions.assertEquals(1, files.size());
+ Assertions.assertTrue(files.get(0).endsWith("/test.bin"));
+ }
+ }
+
+ @Test
+ void testStrictChecksumSkipWhenSameContentEvenIfMtimeDiff() throws
Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+ Path sourceFile = sourceDir.resolve("test.bin");
+ Path targetFile = targetDir.resolve("test.bin");
+
+ writeFile(sourceFile, "abc".getBytes());
+ writeFile(targetFile, "abc".getBytes());
+ setMtime(sourceFile, 1_000);
+ setMtime(targetFile, 2_000);
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ strategy.setPluginConfig(
+ updateConfig(
+ sourceDir.toUri().toString(),
+ targetDir.toUri().toString(),
+ "strict",
+ "checksum"));
+ strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+ List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+ Assertions.assertTrue(files.isEmpty(), "Checksum equal -> SKIP");
+ }
+ }
+
+ @Test
+ void testStrictChecksumCopyWhenSameLengthButDifferentContent() throws
Exception {
+ Path sourceDir = tempDir.resolve("src");
+ Path targetDir = tempDir.resolve("dst");
+ Path sourceFile = sourceDir.resolve("test.bin");
+ Path targetFile = targetDir.resolve("test.bin");
+
+ writeFile(sourceFile, "abc".getBytes());
+ writeFile(targetFile, "abd".getBytes());
+
+ try (BinaryReadStrategy strategy = new BinaryReadStrategy()) {
+ strategy.setPluginConfig(
+ updateConfig(
+ sourceDir.toUri().toString(),
+ targetDir.toUri().toString(),
+ "strict",
+ "checksum"));
+ strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+
+ List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
+ Assertions.assertEquals(1, files.size());
+ Assertions.assertTrue(files.get(0).endsWith("/test.bin"));
+ }
+ }
+
+ private static void writeFile(Path path, byte[] content) throws
IOException {
+ Files.createDirectories(path.getParent());
+ Files.write(path, content);
+ }
+
+ private static void setMtime(Path path, long millis) throws IOException {
+ Files.setLastModifiedTime(path, FileTime.fromMillis(millis));
+ }
+
+ private static Config updateConfig(
+ String sourcePath, String targetPath, String updateStrategy,
String compareMode) {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("path", sourcePath);
+ configMap.put("file_format_type", "binary");
+ configMap.put("sync_mode", "update");
+ configMap.put("target_path", targetPath);
+ configMap.put("update_strategy", updateStrategy);
+ configMap.put("compare_mode", compareMode);
+ return ConfigFactory.parseMap(configMap);
+ }
+
+ static class LocalConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
+
+ public LocalConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 7e0de70c6e..6f8211873a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
@@ -94,6 +95,15 @@ public class HdfsFileSourceFactory implements
TableSourceFactory {
.optional(FileBaseSourceOptions.NULL_FORMAT)
.optional(FileBaseSourceOptions.FILENAME_EXTENSION)
.optional(FileBaseSourceOptions.READ_COLUMNS)
+ .optional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileBaseSourceOptions.TARGET_HADOOP_CONF,
+ FileBaseSourceOptions.UPDATE_STRATEGY,
+ FileBaseSourceOptions.COMPARE_MODE)
+ .conditional(
+ FileBaseSourceOptions.SYNC_MODE,
+ FileSyncMode.UPDATE,
+ FileBaseSourceOptions.TARGET_PATH)
.optional(FileBaseSourceOptions.HDFS_SITE_PATH)
.optional(FileBaseSourceOptions.KERBEROS_PRINCIPAL)
.optional(FileBaseSourceOptions.KERBEROS_KEYTAB_PATH)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
index 024eaaebd8..7920765322 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
@@ -26,10 +26,12 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSourceFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils;
@@ -50,10 +52,14 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -170,6 +176,42 @@ class HdfsFileSourceConfigTest {
Assertions.assertEquals("hdfs_multi_source_read4",
seaTunnelRows.get(3).getField(1));
}
+ @Test
+ void testUpdateModeDistcpSkipStillProducesBinarySchema(@TempDir
java.nio.file.Path tempDir)
+ throws IOException {
+ java.nio.file.Path sourceDir = tempDir.resolve("src");
+ java.nio.file.Path targetDir = tempDir.resolve("dst");
+ Files.createDirectories(sourceDir);
+ Files.createDirectories(targetDir);
+
+ java.nio.file.Path sourceFile = sourceDir.resolve("test.bin");
+ java.nio.file.Path targetFile = targetDir.resolve("test.bin");
+ Files.write(sourceFile, "abc".getBytes(StandardCharsets.UTF_8));
+ Files.write(targetFile, "abc".getBytes(StandardCharsets.UTF_8));
+ Files.setLastModifiedTime(sourceFile, FileTime.fromMillis(1_000));
+ Files.setLastModifiedTime(targetFile, FileTime.fromMillis(2_000));
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HdfsSourceConfigOptions.FILE_PATH.key(),
sourceDir.toString());
+ configMap.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
"binary");
+ configMap.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+ configMap.put(FileBaseSourceOptions.SYNC_MODE.key(), "update");
+ configMap.put(FileBaseSourceOptions.TARGET_PATH.key(),
targetDir.toString());
+ configMap.put(FileBaseSourceOptions.UPDATE_STRATEGY.key(), "distcp");
+ configMap.put(FileBaseSourceOptions.COMPARE_MODE.key(), "len_mtime");
+
+ Config config = ConfigFactory.parseMap(configMap);
+ HdfsFileSourceConfig sourceConfig =
+ new HdfsFileSourceConfig(ReadonlyConfig.fromConfig(config));
+
+ Assertions.assertTrue(
+ sourceConfig.getFilePaths().isEmpty(),
+ "Update+distcp should filter files when target is newer and
same length");
+ Assertions.assertEquals(
+ BinaryReadStrategy.binaryRowType,
+ sourceConfig.getCatalogTable().getSeaTunnelRowType());
+ }
+
@AfterEach
public void clear() throws IOException {
deleteFile(DATA_FILE_PATH1);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index 40de8f03a2..fde61f1f3a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -128,4 +128,72 @@ public class HdfsFileIT extends TestSuiteBase implements
TestResource {
container.executeJob("/hdfs_normal_to_assert.conf");
Assertions.assertEquals(0, readResult.getExitCode());
}
+
+ @TestTemplate
+ public void testHdfsBinaryUpdateModeDistcp(TestContainer container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putHdfsFile("/update/src/test.bin", "abc");
+
+ org.testcontainers.containers.Container.ExecResult firstRun =
+ container.executeJob("/hdfs_binary_update_distcp.conf");
+ Assertions.assertEquals(0, firstRun.getExitCode());
+ Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+
+ // Make target newer with same length, distcp strategy should SKIP
overwrite.
+ putHdfsFile("/update/dst/test.bin", "zzz");
+ org.testcontainers.containers.Container.ExecResult secondRun =
+ container.executeJob("/hdfs_binary_update_distcp.conf");
+ Assertions.assertEquals(0, secondRun.getExitCode());
+ Assertions.assertEquals("zzz", readHdfsFile("/update/dst/test.bin"));
+
+ // Change source length, distcp strategy should COPY overwrite.
+ putHdfsFile("/update/src/test.bin", "abcd");
+ org.testcontainers.containers.Container.ExecResult thirdRun =
+ container.executeJob("/hdfs_binary_update_distcp.conf");
+ Assertions.assertEquals(0, thirdRun.getExitCode());
+ Assertions.assertEquals("abcd", readHdfsFile("/update/dst/test.bin"));
+ }
+
+ @TestTemplate
+ public void testHdfsBinaryUpdateModeStrictChecksum(TestContainer container)
+ throws IOException, InterruptedException {
+ resetUpdateTestPath();
+ putHdfsFile("/update/src/test.bin", "abc");
+
+ org.testcontainers.containers.Container.ExecResult firstRun =
+
container.executeJob("/hdfs_binary_update_strict_checksum.conf");
+ Assertions.assertEquals(0, firstRun.getExitCode());
+ Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+
+ // Same length but different content, strict+checksum should COPY
overwrite.
+ putHdfsFile("/update/dst/test.bin", "zzz");
+ org.testcontainers.containers.Container.ExecResult secondRun =
+
container.executeJob("/hdfs_binary_update_strict_checksum.conf");
+ Assertions.assertEquals(0, secondRun.getExitCode());
+ Assertions.assertEquals("abc", readHdfsFile("/update/dst/test.bin"));
+ }
+
+ private void resetUpdateTestPath() throws IOException,
InterruptedException {
+ nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /update ||
true");
+ org.testcontainers.containers.Container.ExecResult mkdirResult =
+ nameNode.execInContainer(
+ "hdfs", "dfs", "-mkdir", "-p", "/update/src",
"/update/dst", "/update/tmp");
+ Assertions.assertEquals(0, mkdirResult.getExitCode());
+ }
+
+ private void putHdfsFile(String hdfsPath, String content)
+ throws IOException, InterruptedException {
+ String command = "printf '" + content + "' | hdfs dfs -put -f - " +
hdfsPath;
+ org.testcontainers.containers.Container.ExecResult putResult =
+ nameNode.execInContainer("bash", "-c", command);
+ Assertions.assertEquals(0, putResult.getExitCode());
+ }
+
+ private String readHdfsFile(String hdfsPath) throws IOException,
InterruptedException {
+ org.testcontainers.containers.Container.ExecResult catResult =
+ nameNode.execInContainer("hdfs", "dfs", "-cat", hdfsPath);
+ Assertions.assertEquals(0, catResult.getExitCode());
+ return catResult.getStdout() == null ? "" :
catResult.getStdout().trim();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
new file mode 100644
index 0000000000..6dbdbdb999
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_distcp.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/update/dst"
+ update_strategy = "distcp"
+ compare_mode = "len_mtime"
+
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
+sink {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/update/dst"
+ tmp_path = "/update/tmp"
+ file_format_type = "binary"
+ data_save_mode = "APPEND_DATA"
+
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
new file mode 100644
index 0000000000..7dd180d43a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_binary_update_strict_checksum.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/update/src"
+ file_format_type = "binary"
+
+ sync_mode = "update"
+ target_path = "/update/dst"
+ update_strategy = "strict"
+ compare_mode = "checksum"
+
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
+sink {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/update/dst"
+ tmp_path = "/update/tmp"
+ file_format_type = "binary"
+ data_save_mode = "APPEND_DATA"
+
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+