This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 97aa22663 [flink] Support cloning table from latest snapshot (#3287)
97aa22663 is described below
commit 97aa2266394ccd0a248675e97f880c9e9ef29c54
Author: wangwj <[email protected]>
AuthorDate: Tue Jun 4 13:54:52 2024 +0800
[flink] Support cloning table from latest snapshot (#3287)
This closes #3287.
---
docs/content/migration/clone-tables.md | 82 ++++
.../main/java/org/apache/paimon/schema/Schema.java | 9 +
.../apache/paimon/flink/action/CloneAction.java | 150 ++++++
.../paimon/flink/action/CloneActionFactory.java | 89 ++++
.../apache/paimon/flink/clone/CloneFileInfo.java | 53 ++
.../paimon/flink/clone/CloneSourceBuilder.java | 122 +++++
.../paimon/flink/clone/CopyFileOperator.java | 126 +++++
.../flink/clone/PickFilesForCloneOperator.java | 136 ++++++
.../apache/paimon/flink/clone/PickFilesUtil.java | 206 ++++++++
.../flink/clone/SnapshotHintChannelComputer.java | 49 ++
.../paimon/flink/clone/SnapshotHintOperator.java | 101 ++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../paimon/flink/action/CloneActionITCase.java | 532 +++++++++++++++++++++
.../paimon/flink/action/CloneActionSlowFileIO.java | 112 +++++
.../services/org.apache.paimon.fs.FileIOLoader | 16 +
15 files changed, 1784 insertions(+)
diff --git a/docs/content/migration/clone-tables.md
b/docs/content/migration/clone-tables.md
new file mode 100644
index 000000000..871baee6f
--- /dev/null
+++ b/docs/content/migration/clone-tables.md
@@ -0,0 +1,82 @@
+---
+title: "Clone Tables"
+weight: 3
+type: docs
+aliases:
+- /migration/clone-tables.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Clone Tables
+
+Paimon supports cloning tables for data migration.
+Currently, only table files used by the latest snapshot will be cloned.
+
+To clone a table, run the following command to submit a clone job.
+If the table you clone is not modified at the same time, it is recommended to
submit a Flink batch job for better performance.
+However, if you want to clone the table while writing it at the same time,
submit a Flink streaming job for automatic failure recovery.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ clone \
+ --warehouse <source-warehouse-path> \
+ [--database <source-database-name>] \
+ [--table <source-table-name>] \
+ [--catalog_conf <source-paimon-catalog-conf> [--catalog_conf
<source-paimon-catalog-conf> ...]] \
+ --target_warehouse <target-warehouse-path> \
+ [--target_database <target-database>] \
+ [--target_table <target-table-name>] \
+ [--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf
<target-paimon-catalog-conf> ...]]
+ [--parallelism <parallelism>]
+```
+
+{{< hint info >}}
+1. If `database` is not specified, all tables in all databases of the
specified warehouse will be cloned.
+2. If `table` is not specified, all tables of the specified database will be
cloned.
+{{< /hint >}}
+
+Example: Clone `test_db.test_table` from source warehouse to target warehouse.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ clone \
+ --warehouse s3:///path/to/warehouse_source \
+ --database test_db \
+ --table test_table \
+ --catalog_conf s3.endpoint=https://****.com \
+ --catalog_conf s3.access-key=***** \
+ --catalog_conf s3.secret-key=***** \
+ --target_warehouse s3:///path/to/warehouse_target \
+ --target_database test_db \
+ --target_table test_table \
+ --target_catalog_conf s3.endpoint=https://****.com \
+ --target_catalog_conf s3.access-key=***** \
+ --target_catalog_conf s3.secret-key=*****
+```
+
+For more usage of the clone action, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ clone --help
+```
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 824dff5c4..b75758374 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -329,4 +329,13 @@ public class Schema {
return new Schema(columns, partitionKeys, primaryKeys, options,
comment);
}
}
+
+ public static Schema fromTableSchema(TableSchema tableSchema) {
+ return new Schema(
+ tableSchema.fields(),
+ tableSchema.partitionKeys(),
+ tableSchema.primaryKeys(),
+ tableSchema.options(),
+ tableSchema.comment());
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
new file mode 100644
index 000000000..caa676661
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.clone.CloneFileInfo;
+import org.apache.paimon.flink.clone.CloneSourceBuilder;
+import org.apache.paimon.flink.clone.CopyFileOperator;
+import org.apache.paimon.flink.clone.PickFilesForCloneOperator;
+import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
+import org.apache.paimon.flink.clone.SnapshotHintOperator;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.options.CatalogOptions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.StringUtils.isBlank;
+
+/** The Latest Snapshot clone action for Flink. */
+public class CloneAction extends ActionBase {
+
+ private final int parallelism;
+
+ private Map<String, String> sourceCatalogConfig;
+ private final String database;
+ private final String tableName;
+
+ private Map<String, String> targetCatalogConfig;
+ private final String targetDatabase;
+ private final String targetTableName;
+
+ public CloneAction(
+ String warehouse,
+ String database,
+ String tableName,
+ Map<String, String> sourceCatalogConfig,
+ String targetWarehouse,
+ String targetDatabase,
+ String targetTableName,
+ Map<String, String> targetCatalogConfig,
+ String parallelismStr) {
+ super(warehouse, sourceCatalogConfig);
+
+ checkNotNull(warehouse, "warehouse must not be null.");
+ checkNotNull(targetWarehouse, "targetWarehouse must not be null.");
+
+ this.parallelism =
+ isBlank(parallelismStr) ? env.getParallelism() :
Integer.parseInt(parallelismStr);
+
+ this.sourceCatalogConfig = new HashMap<>();
+ if (!sourceCatalogConfig.isEmpty()) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ }
+ this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(),
warehouse);
+ this.database = database;
+ this.tableName = tableName;
+
+ this.targetCatalogConfig = new HashMap<>();
+ if (!targetCatalogConfig.isEmpty()) {
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+ this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(),
targetWarehouse);
+ this.targetDatabase = targetDatabase;
+ this.targetTableName = targetTableName;
+ }
+
+ // ------------------------------------------------------------------------
+ // Java API
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void build() {
+ try {
+ buildCloneFlinkJob(env);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws
Exception {
+ DataStream<Tuple2<String, String>> cloneSource =
+ new CloneSourceBuilder(
+ env,
+ sourceCatalogConfig,
+ database,
+ tableName,
+ targetDatabase,
+ targetTableName)
+ .build();
+
+ SingleOutputStreamOperator<CloneFileInfo> pickFilesForClone =
+ cloneSource
+ .transform(
+ "Pick Files",
+ TypeInformation.of(CloneFileInfo.class),
+ new PickFilesForCloneOperator(
+ sourceCatalogConfig,
targetCatalogConfig))
+ .forceNonParallel();
+
+ SingleOutputStreamOperator<CloneFileInfo> copyFiles =
+ pickFilesForClone
+ .rebalance()
+ .transform(
+ "Copy Files",
+ TypeInformation.of(CloneFileInfo.class),
+ new CopyFileOperator(sourceCatalogConfig,
targetCatalogConfig))
+ .setParallelism(parallelism);
+
+ SingleOutputStreamOperator<CloneFileInfo> snapshotHintOperator =
+ FlinkStreamPartitioner.partition(
+ copyFiles, new SnapshotHintChannelComputer(),
parallelism)
+ .transform(
+ "Recreate Snapshot Hint",
+ TypeInformation.of(CloneFileInfo.class),
+ new SnapshotHintOperator(targetCatalogConfig))
+ .setParallelism(parallelism);
+
+ snapshotHintOperator.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
+ }
+
+ @Override
+ public void run() throws Exception {
+ build();
+ execute("Clone job");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
new file mode 100644
index 000000000..db45c8508
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link CloneAction}. */
+public class CloneActionFactory implements ActionFactory {
+
+ private static final String IDENTIFIER = "clone";
+ private static final String PARALLELISM = "parallelism";
+ private static final String TARGET_WAREHOUSE = "target_warehouse";
+ private static final String TARGET_DATABASE = "target_database";
+ private static final String TARGET_TABLE = "target_table";
+ private static final String TARGET_CATALOG_CONF = "target_catalog_conf";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ CloneAction cloneAction =
+ new CloneAction(
+ params.get(WAREHOUSE),
+ params.get(DATABASE),
+ params.get(TABLE),
+ optionalConfigMap(params, CATALOG_CONF),
+ params.get(TARGET_WAREHOUSE),
+ params.get(TARGET_DATABASE),
+ params.get(TARGET_TABLE),
+ optionalConfigMap(params, TARGET_CATALOG_CONF),
+ params.get(PARALLELISM));
+
+ return Optional.of(cloneAction);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"clone\" runs a batch job for clone the
latest Snapshot.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " clone --warehouse <warehouse_path> "
+ + "[--database <database_name>] "
+ + "[--table <table_name>] "
+ + "[--catalog_conf <source-paimon-catalog-conf>
[--catalog_conf <source-paimon-catalog-conf> ...]] "
+ + "--target_warehouse <target_warehouse_path> "
+ + "[--target_database <target_database_name>] "
+ + "[--target_table <target_table_name>] "
+ + "[--target_catalog_conf <target-paimon-catalog-conf>
[--target_catalog_conf <target-paimon-catalog-conf> ...]] "
+ + "[--parallelism <parallelism>]");
+
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " clone --warehouse s3:///path1/from/warehouse "
+ + "--database test_db "
+ + "--table test_table "
+ + "--catalog_conf s3.endpoint=https://****.com "
+ + "--catalog_conf s3.access-key=***** "
+ + "--catalog_conf s3.secret-key=***** "
+ + "--target_warehouse s3:///path2/to/warehouse "
+ + "--target_database test_db_copy "
+ + "--target_table test_table_copy "
+ + "--target_catalog_conf s3.endpoint=https://****.com "
+ + "--target_catalog_conf s3.access-key=***** "
+ + "--target_catalog_conf s3.secret-key=***** ");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
new file mode 100644
index 000000000..d91695841
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+/** The information of copy file. */
+public class CloneFileInfo {
+
+ private final String filePathExcludeTableRoot;
+ private final String sourceIdentifier;
+ private final String targetIdentifier;
+
+ public CloneFileInfo(
+ String filePathExcludeTableRoot, String sourceIdentifier, String
targetIdentifier) {
+ this.filePathExcludeTableRoot = filePathExcludeTableRoot;
+ this.sourceIdentifier = sourceIdentifier;
+ this.targetIdentifier = targetIdentifier;
+ }
+
+ public String getFilePathExcludeTableRoot() {
+ return filePathExcludeTableRoot;
+ }
+
+ public String getSourceIdentifier() {
+ return sourceIdentifier;
+ }
+
+ public String getTargetIdentifier() {
+ return targetIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }",
+ filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
new file mode 100644
index 000000000..db0452873
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Pick the tables to be cloned based on the user input parameters. The record
type of the build
+ * DataStream is {@link Tuple2}. The left element is the identifier of source
table and the right
+ * element is the identifier of target table.
+ */
+public class CloneSourceBuilder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CloneSourceBuilder.class);
+
+ private final StreamExecutionEnvironment env;
+ private final Map<String, String> sourceCatalogConfig;
+ private final String database;
+ private final String tableName;
+ private final String targetDatabase;
+ private final String targetTableName;
+
+ public CloneSourceBuilder(
+ StreamExecutionEnvironment env,
+ Map<String, String> sourceCatalogConfig,
+ String database,
+ String tableName,
+ String targetDatabase,
+ String targetTableName) {
+ this.env = env;
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.database = database;
+ this.tableName = tableName;
+ this.targetDatabase = targetDatabase;
+ this.targetTableName = targetTableName;
+ }
+
+ public DataStream<Tuple2<String, String>> build() throws Exception {
+ try (Catalog sourceCatalog =
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig))) {
+ return build(sourceCatalog);
+ }
+ }
+
+ private DataStream<Tuple2<String, String>> build(Catalog sourceCatalog)
throws Exception {
+ List<Tuple2<String, String>> result = new ArrayList<>();
+
+ if (database == null) {
+ checkArgument(
+ StringUtils.isBlank(tableName),
+ "tableName must be blank when database is null.");
+ checkArgument(
+ StringUtils.isBlank(targetDatabase),
+ "targetDatabase must be blank when clone all tables in a
catalog.");
+ checkArgument(
+ StringUtils.isBlank(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+ for (String db : sourceCatalog.listDatabases()) {
+ for (String table : sourceCatalog.listTables(db)) {
+ String s = db + "." + table;
+ result.add(new Tuple2<>(s, s));
+ }
+ }
+ } else if (tableName == null) {
+ checkArgument(
+ !StringUtils.isBlank(targetDatabase),
+ "targetDatabase must not be blank when clone all tables in
a database.");
+ checkArgument(
+ StringUtils.isBlank(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+ for (String table : sourceCatalog.listTables(database)) {
+ result.add(new Tuple2<>(database + "." + table, targetDatabase
+ "." + table));
+ }
+ } else {
+ checkArgument(
+ !StringUtils.isBlank(targetDatabase),
+ "targetDatabase must not be blank when clone a table.");
+ checkArgument(
+ !StringUtils.isBlank(targetTableName),
+ "targetTableName must not be blank when clone a table.");
+ result.add(
+ new Tuple2<>(
+ database + "." + tableName, targetDatabase + "." +
targetTableName));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The clone identifiers of source table and target table
are: {}", result);
+ }
+ return env.fromCollection(result).forceNonParallel().forward();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
new file mode 100644
index 000000000..5b320f4e9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.IOUtils;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** A Operator to copy files. */
+public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
+ implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CopyFileOperator.class);
+
+ private final Map<String, String> sourceCatalogConfig;
+ private final Map<String, String> targetCatalogConfig;
+
+ private AbstractCatalog sourceCatalog;
+ private AbstractCatalog targetCatalog;
+
+ public CopyFileOperator(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+
+ @Override
+ public void open() throws Exception {
+ sourceCatalog =
+ (AbstractCatalog)
+ FlinkCatalogFactory.createPaimonCatalog(
+ Options.fromMap(sourceCatalogConfig));
+ targetCatalog =
+ (AbstractCatalog)
+ FlinkCatalogFactory.createPaimonCatalog(
+ Options.fromMap(targetCatalogConfig));
+ }
+
+ @Override
+ public void processElement(StreamRecord<CloneFileInfo> streamRecord)
throws Exception {
+ CloneFileInfo cloneFileInfo = streamRecord.getValue();
+
+ FileIO sourceTableFileIO = sourceCatalog.fileIO();
+ FileIO targetTableFileIO = targetCatalog.fileIO();
+ Path sourceTableRootPath =
+ sourceCatalog.getDataTableLocation(
+
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+ Path targetTableRootPath =
+ targetCatalog.getDataTableLocation(
+
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+
+ String filePathExcludeTableRoot =
cloneFileInfo.getFilePathExcludeTableRoot();
+ Path sourcePath = new Path(sourceTableRootPath +
filePathExcludeTableRoot);
+ Path targetPath = new Path(targetTableRootPath +
filePathExcludeTableRoot);
+
+ if (targetTableFileIO.exists(targetPath)
+ && targetTableFileIO.getFileSize(targetPath)
+ == sourceTableFileIO.getFileSize(sourcePath)) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Skipping clone target file {} because it already
exists and has the same size.",
+ targetPath);
+ }
+
+ // We still send record to SnapshotHintOperator to avoid the
following corner case:
+ //
+ // When cloning two tables under a catalog, after clone table A is
completed,
+ // the job fails due to snapshot expiration when cloning table B.
+ // If we don't re-send file information of table A to
SnapshotHintOperator,
+ // the snapshot hint file of A will not be created after the
restart.
+ output.collect(streamRecord);
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Begin copy file from {} to {}.", sourcePath,
targetPath);
+ }
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(sourcePath),
+ targetTableFileIO.newOutputStream(targetPath, true));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("End copy file from {} to {}.", sourcePath, targetPath);
+ }
+
+ output.collect(streamRecord);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (sourceCatalog != null) {
+ sourceCatalog.close();
+ }
+ if (targetCatalog != null) {
+ targetCatalog.close();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
new file mode 100644
index 000000000..883d7b06a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pick the files to be cloned of a table based on the input record. The
record type it produce is
+ * CloneFileInfo that indicate the information of copy file.
+ */
+public class PickFilesForCloneOperator extends
AbstractStreamOperator<CloneFileInfo>
+ implements OneInputStreamOperator<Tuple2<String, String>,
CloneFileInfo> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PickFilesForCloneOperator.class);
+
+ private final Map<String, String> sourceCatalogConfig;
+ private final Map<String, String> targetCatalogConfig;
+
+ private Catalog sourceCatalog;
+ private Catalog targetCatalog;
+
+ public PickFilesForCloneOperator(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+
+ @Override
+ public void open() throws Exception {
+ sourceCatalog =
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+ targetCatalog =
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<String, String>>
streamRecord) throws Exception {
+ String sourceIdentifierStr = streamRecord.getValue().f0;
+ Identifier sourceIdentifier =
Identifier.fromString(sourceIdentifierStr);
+ String targetIdentifierStr = streamRecord.getValue().f1;
+ Identifier targetIdentifier =
Identifier.fromString(targetIdentifierStr);
+
+ FileStoreTable sourceTable = (FileStoreTable)
sourceCatalog.getTable(sourceIdentifier);
+ targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
+ targetCatalog.createTable(
+ targetIdentifier,
Schema.fromTableSchema(sourceTable.schema()), true);
+
+ List<CloneFileInfo> result =
+ toCloneFileInfos(
+
PickFilesUtil.getUsedFilesForLatestSnapshot(sourceTable),
+ sourceTable.location(),
+ sourceIdentifierStr,
+ targetIdentifierStr);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The CloneFileInfo of table {} is {} : ",
sourceTable.location(), result);
+ }
+
+ for (CloneFileInfo info : result) {
+ output.collect(new StreamRecord<>(info));
+ }
+ }
+
+ private List<CloneFileInfo> toCloneFileInfos(
+ List<Path> files,
+ Path sourceTableRoot,
+ String sourceIdentifier,
+ String targetIdentifier) {
+ List<CloneFileInfo> result = new ArrayList<>();
+ for (Path file : files) {
+ Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
+ result.add(
+ new CloneFileInfo(relativePath.toString(),
sourceIdentifier, targetIdentifier));
+ }
+ return result;
+ }
+
+ private Path getPathExcludeTableRoot(Path absolutePath, Path
sourceTableRoot) {
+ String fileAbsolutePath = absolutePath.toUri().toString();
+ String sourceTableRootPath = sourceTableRoot.toString();
+
+ Preconditions.checkState(
+ fileAbsolutePath.startsWith(sourceTableRootPath),
+ "File absolute path does not start with source table root
path. This is unexpected. "
+ + "fileAbsolutePath is: "
+ + fileAbsolutePath
+ + ", sourceTableRootPath is: "
+ + sourceTableRootPath);
+
+ return new
Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (sourceCatalog != null) {
+ sourceCatalog.close();
+ }
+ if (targetCatalog != null) {
+ targetCatalog.close();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
new file mode 100644
index 000000000..f83b5cf8f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Util class for get used files' paths of a table's latest snapshot. */
+public class PickFilesUtil {
+
+ private static final int READ_FILE_RETRY_NUM = 3;
+ private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+ public static List<Path> getUsedFilesForLatestSnapshot(FileStoreTable
table) {
+ FileStore<?> store = table.store();
+ SnapshotManager snapshotManager = store.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ ManifestList manifestList = store.manifestListFactory().create();
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+ List<Path> files = new ArrayList<>();
+ if (snapshot != null) {
+ files.add(snapshotManager.snapshotPath(snapshot.id()));
+ files.addAll(
+ getUsedFilesInternal(
+ snapshot,
+ store.pathFactory(),
+ store.newScan(),
+ manifestList,
+ indexFileHandler));
+ }
+ for (long id : schemaManager.listAllIds()) {
+ files.add(schemaManager.toSchemaPath(id));
+ }
+ return files;
+ }
+
+ private static List<Path> getUsedFilesInternal(
+ Snapshot snapshot,
+ FileStorePathFactory pathFactory,
+ FileStoreScan scan,
+ ManifestList manifestList,
+ IndexFileHandler indexFileHandler) {
+ List<Path> files = new ArrayList<>();
+ addManifestList(files, snapshot, pathFactory);
+
+ try {
+ // try to read manifests
+ List<ManifestFileMeta> manifestFileMetas =
+ retryReadingFiles(
+ () -> readAllManifestsWithIOException(snapshot,
manifestList));
+ if (manifestFileMetas == null) {
+ return Collections.emptyList();
+ }
+ List<String> manifestFileName =
+ manifestFileMetas.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList());
+ files.addAll(
+ manifestFileName.stream()
+ .map(pathFactory::toManifestFilePath)
+ .collect(Collectors.toList()));
+
+ // try to read data files
+ List<Path> dataFiles = new ArrayList<>();
+ List<SimpleFileEntry> simpleFileEntries =
+ scan.withSnapshot(snapshot).readSimpleEntries();
+ for (SimpleFileEntry simpleFileEntry : simpleFileEntries) {
+ Path dataFilePath =
+ pathFactory
+ .createDataFilePathFactory(
+ simpleFileEntry.partition(),
simpleFileEntry.bucket())
+ .toPath(simpleFileEntry.fileName());
+ dataFiles.add(dataFilePath);
+ }
+
+ // When scanning, dataFiles are listed from older to newer.
+ // By reversing dataFiles, newer files will be copied first.
+ //
+ // We do this because new files are from the latest partition, and
are prone to be
+ // deleted. Older files however, are from previous partitions and
should not be changed
+ // very often.
+ Collections.reverse(dataFiles);
+ files.addAll(dataFiles);
+
+ // try to read index files
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
+
files.add(pathFactory.indexManifestFileFactory().toPath(indexManifest));
+
+ List<IndexManifestEntry> indexManifestEntries =
+ retryReadingFiles(
+ () ->
indexFileHandler.readManifestWithIOException(indexManifest));
+ if (indexManifestEntries == null) {
+ return Collections.emptyList();
+ }
+
+ indexManifestEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .map(indexFileHandler::filePath)
+ .forEach(files::add);
+ }
+
+ // add statistic file
+ if (snapshot.statistics() != null) {
+
files.add(pathFactory.statsFileFactory().toPath(snapshot.statistics()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return files;
+ }
+
+ private static void addManifestList(
+ List<Path> used, Snapshot snapshot, FileStorePathFactory
pathFactory) {
+ used.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+ used.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+ String changelogManifestList = snapshot.changelogManifestList();
+ if (changelogManifestList != null) {
+ used.add(pathFactory.toManifestListPath(changelogManifestList));
+ }
+ }
+
+ private static List<ManifestFileMeta> readAllManifestsWithIOException(
+ Snapshot snapshot, ManifestList manifestList) throws IOException {
+ List<ManifestFileMeta> result = new ArrayList<>();
+
+
result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
+
result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList()));
+
+ String changelogManifestList = snapshot.changelogManifestList();
+ if (changelogManifestList != null) {
+
result.addAll(manifestList.readWithIOException(changelogManifestList));
+ }
+
+ return result;
+ }
+
+ @Nullable
+ private static <T> T retryReadingFiles(ReaderWithIOException<T> reader)
throws IOException {
+ int retryNumber = 0;
+ IOException caught = null;
+ while (retryNumber++ < READ_FILE_RETRY_NUM) {
+ try {
+ return reader.read();
+ } catch (FileNotFoundException e) {
+ return null;
+ } catch (IOException e) {
+ caught = e;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ throw caught;
+ }
+
+ /** A helper functional interface for method {@link #retryReadingFiles}. */
+ @FunctionalInterface
+ interface ReaderWithIOException<T> {
+ T read() throws IOException;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
new file mode 100644
index 000000000..c947ab1e6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+/**
+ * {@link ChannelComputer} for distributing {@link CloneFileInfo} records into
SnapshotHintOperator
+ * for recreate snapshot hints.
+ *
+ * <p>{@link CloneFileInfo}s are distributed by target table name, so the
creation of snapshot hints
+ * of each table is handled by only one parallelism.
+ */
+public class SnapshotHintChannelComputer implements
ChannelComputer<CloneFileInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(CloneFileInfo record) {
+ int hsh = 0;
+ for (int i = 0; i < record.getTargetIdentifier().length(); i++) {
+ hsh = (hsh * 131 + record.getTargetIdentifier().charAt(i)) %
numChannels;
+ }
+ return hsh;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
new file mode 100644
index 000000000..938119f94
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/** Create snapshot hint files after copying a table. */
+public class SnapshotHintOperator extends AbstractStreamOperator<CloneFileInfo>
+ implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo>,
BoundedOneInput {
+
+ private final Map<String, String> targetCatalogConfig;
+
+ private Catalog targetCatalog;
+ private Set<String> identifiers;
+
+ public SnapshotHintOperator(Map<String, String> targetCatalogConfig) {
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+
+ @Override
+ public void open() throws Exception {
+ targetCatalog =
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ identifiers = new HashSet<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<CloneFileInfo> streamRecord)
throws Exception {
+ String identifier = streamRecord.getValue().getTargetIdentifier();
+ identifiers.add(identifier);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ for (String identifier : identifiers) {
+ FileStoreTable targetTable =
+ (FileStoreTable)
targetCatalog.getTable(Identifier.fromString(identifier));
+ commitSnapshotHintInTargetTable(targetTable.snapshotManager());
+ }
+ }
+
+ private void commitSnapshotHintInTargetTable(SnapshotManager
targetTableSnapshotManager)
+ throws IOException {
+ OptionalLong optionalSnapshotId =
+ targetTableSnapshotManager.safelyGetAllSnapshots().stream()
+ .mapToLong(Snapshot::id)
+ .max();
+ if (optionalSnapshotId.isPresent()) {
+ long snapshotId = optionalSnapshotId.getAsLong();
+ targetTableSnapshotManager.commitEarliestHint(snapshotId);
+ targetTableSnapshotManager.commitLatestHint(snapshotId);
+ for (Snapshot snapshot :
targetTableSnapshotManager.safelyGetAllSnapshots()) {
+ if (snapshot.id() != snapshotId) {
+ targetTableSnapshotManager
+ .fileIO()
+
.deleteQuietly(targetTableSnapshotManager.snapshotPath(snapshot.id()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (targetCatalog != null) {
+ targetCatalog.close();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 33a43009d..38cb36713 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,7 @@
# limitations under the License.
### action factories
+org.apache.paimon.flink.action.CloneActionFactory
org.apache.paimon.flink.action.CompactActionFactory
org.apache.paimon.flink.action.CompactDatabaseActionFactory
org.apache.paimon.flink.action.DropPartitionActionFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
new file mode 100644
index 000000000..74cb44cbe
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.PickFilesUtil;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link CloneAction}. */
+public class CloneActionITCase extends AbstractTestBase {
+
+ // ------------------------------------------------------------------------
+ // Constructed Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCloneTable() throws Exception {
+ String sourceWarehouse = getTempDirPath("source-ware");
+ prepareData(sourceWarehouse);
+
+ String targetWarehouse = getTempDirPath("target-ware");
+ String[] args =
+ new String[] {
+ "clone",
+ "--warehouse",
+ sourceWarehouse,
+ "--database",
+ "db1",
+ "--table",
+ "t1",
+ "--target_warehouse",
+ targetWarehouse,
+ "--target_database",
+ "mydb",
+ "--target_table",
+ "myt"
+ };
+ ActionFactory.createAction(args).get().run();
+
+ // check result
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG targetcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
targetWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG targetcat");
+
+ List<String> actual = collect(tEnv, "SELECT pt, k, v FROM mydb.myt
ORDER BY pt, k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]",
"+I[two, 2, 200]");
+ compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse,
"mydb", "myt");
+ }
+
+ @Test
+ public void testCloneDatabase() throws Exception {
+ String sourceWarehouse = getTempDirPath("source-ware");
+ prepareData(sourceWarehouse);
+
+ String targetWarehouse = getTempDirPath("target-ware");
+ String[] args =
+ new String[] {
+ "clone",
+ "--warehouse",
+ sourceWarehouse,
+ "--database",
+ "db1",
+ "--target_warehouse",
+ targetWarehouse,
+ "--target_database",
+ "mydb"
+ };
+ ActionFactory.createAction(args).get().run();
+
+ // check result
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG targetcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
targetWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG targetcat");
+
+ List<String> actual = collect(tEnv, "SELECT pt, k, v FROM mydb.t1
ORDER BY pt, k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]",
"+I[two, 2, 200]");
+ compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse,
"mydb", "t1");
+
+ actual = collect(tEnv, "SELECT k, v FROM mydb.t2 ORDER BY k");
+ assertThat(actual)
+ .containsExactly("+I[10, 100]", "+I[20, 201]", "+I[100,
1001]", "+I[200, 2000]");
+ compareCloneFiles(sourceWarehouse, "db1", "t2", targetWarehouse,
"mydb", "t2");
+ }
+
+ @Test
+ public void testCloneWarehouse() throws Exception {
+ String sourceWarehouse = getTempDirPath("source-ware");
+ prepareData(sourceWarehouse);
+
+ String targetWarehouse = getTempDirPath("target-ware");
+ String[] args =
+ new String[] {
+ "clone", "--warehouse", sourceWarehouse,
"--target_warehouse", targetWarehouse
+ };
+ ActionFactory.createAction(args).get().run();
+
+ // check result
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG targetcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
targetWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG targetcat");
+
+ List<String> actual = collect(tEnv, "SELECT pt, k, v FROM db1.t1 ORDER
BY pt, k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]",
"+I[two, 2, 200]");
+ compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse,
"db1", "t1");
+
+ actual = collect(tEnv, "SELECT k, v FROM db1.t2 ORDER BY k");
+ assertThat(actual)
+ .containsExactly("+I[10, 100]", "+I[20, 201]", "+I[100,
1001]", "+I[200, 2000]");
+ compareCloneFiles(sourceWarehouse, "db1", "t2", targetWarehouse,
"db1", "t2");
+
+ actual = collect(tEnv, "SELECT pt, k, v FROM db2.t3 ORDER BY pt, k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[1, 1, one]",
+ "+I[1, 2, twenty]",
+ "+I[2, 1, banana]",
+ "+I[2, 2, orange]");
+ compareCloneFiles(sourceWarehouse, "db2", "t3", targetWarehouse,
"db2", "t3");
+
+ actual = collect(tEnv, "SELECT k, v FROM db2.t4 ORDER BY k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[10, one]", "+I[20, twenty]", "+I[100, banana]",
"+I[200, orange]");
+ compareCloneFiles(sourceWarehouse, "db2", "t4", targetWarehouse,
"db2", "t4");
+ }
+
+ private void prepareData(String sourceWarehouse) throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG sourcecat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
sourceWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG sourcecat");
+
+ tEnv.executeSql("CREATE DATABASE db1");
+ tEnv.executeSql("CREATE DATABASE db2");
+
+ // prepare data: db1.t1
+ tEnv.executeSql(
+ "CREATE TABLE db1.t1 (\n"
+ + " pt STRING,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO db1.t1 VALUES "
+ + "('one', 1, 10), "
+ + "('one', 2, 20), "
+ + "('two', 1, 100)")
+ .await();
+ tEnv.executeSql(
+ "INSERT INTO db1.t1 VALUES "
+ + "('one', 2, 21), "
+ + "('two', 1, 101), "
+ + "('two', 2, 200)")
+ .await();
+
+ // prepare data: db1.t2
+ tEnv.executeSql(
+ "CREATE TABLE db1.t2 (\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO db1.t2 VALUES "
+ + "(10, 100), "
+ + "(20, 200), "
+ + "(100, 1000)")
+ .await();
+ tEnv.executeSql(
+ "INSERT INTO db1.t2 VALUES "
+ + "(20, 201), "
+ + "(100, 1001), "
+ + "(200, 2000)")
+ .await();
+
+ // prepare data: db2.t3
+ tEnv.executeSql(
+ "CREATE TABLE db2.t3 (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO db2.t3 VALUES "
+ + "(1, 1, 'one'), "
+ + "(1, 2, 'two'), "
+ + "(2, 1, 'apple')")
+ .await();
+ tEnv.executeSql(
+ "INSERT INTO db2.t3 VALUES "
+ + "(1, 2, 'twenty'), "
+ + "(2, 1, 'banana'), "
+ + "(2, 2, 'orange')")
+ .await();
+
+ // prepare data: db2.t4
+ tEnv.executeSql(
+ "CREATE TABLE db2.t4 (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO db2.t4 VALUES "
+ + "(10, 'one'), "
+ + "(20, 'two'), "
+ + "(100, 'apple')")
+ .await();
+ tEnv.executeSql(
+ "INSERT INTO db2.t4 VALUES "
+ + "(20, 'twenty'), "
+ + "(100, 'banana'), "
+ + "(200, 'orange')")
+ .await();
+ }
+
+ @Test
+ public void testCloneWithSchemaEvolution() throws Exception {
+ String sourceWarehouse = getTempDirPath("source-ware");
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG sourcecat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
sourceWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG sourcecat");
+
+ tEnv.executeSql(
+ "CREATE TABLE t (\n"
+ + " pt STRING,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'changelog-producer' = 'lookup'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO t VALUES "
+ + "('one', 1, 10), "
+ + "('one', 2, 20), "
+ + "('two', 1, 100)")
+ .await();
+ tEnv.executeSql("ALTER TABLE t ADD v2 STRING AFTER v");
+ tEnv.executeSql(
+ "INSERT INTO t VALUES "
+ + "('one', 2, 21, 'apple'), "
+ + "('two', 1, 101, 'banana'), "
+ + "('two', 2, 200, 'orange')")
+ .await();
+
+ String targetWarehouse = getTempDirPath("target-ware");
+ String[] args =
+ new String[] {
+ "clone", "--warehouse", sourceWarehouse,
"--target_warehouse", targetWarehouse
+ };
+ ActionFactory.createAction(args).get().run();
+
+ // check result
+ tEnv.executeSql(
+ "CREATE CATALOG targetcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
targetWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG targetcat");
+
+ List<String> actual = collect(tEnv, "SELECT pt, k, v, v2 FROM t ORDER
BY pt, k");
+ assertThat(actual)
+ .containsExactly(
+ "+I[one, 1, 10, null]",
+ "+I[one, 2, 21, apple]",
+ "+I[two, 1, 101, banana]",
+ "+I[two, 2, 200, orange]");
+ compareCloneFiles(sourceWarehouse, "default", "t", targetWarehouse,
"default", "t");
+ }
+
+ private void compareCloneFiles(
+ String sourceWarehouse,
+ String sourceDb,
+ String sourceTableName,
+ String targetWarehouse,
+ String targetDb,
+ String targetTableName)
+ throws Exception {
+ FileStoreTable targetTable = getFileStoreTable(targetWarehouse,
targetDb, targetTableName);
+ List<Path> targetTableFiles =
PickFilesUtil.getUsedFilesForLatestSnapshot(targetTable);
+ List<Pair<Path, Path>> filesPathInfoList =
+ targetTableFiles.stream()
+ .map(
+ absolutePath ->
+ Pair.of(
+ absolutePath,
+ getPathExcludeTableRoot(
+ absolutePath,
targetTable.location())))
+ .collect(Collectors.toList());
+
+ FileStoreTable sourceTable = getFileStoreTable(sourceWarehouse,
sourceDb, sourceTableName);
+ Path tableLocation = sourceTable.location();
+ for (Pair<Path, Path> filesPathInfo : filesPathInfoList) {
+ Path sourceTableFile = new Path(tableLocation.toString() +
filesPathInfo.getRight());
+ assertThat(sourceTable.fileIO().exists(sourceTableFile)).isTrue();
+
assertThat(targetTable.fileIO().getFileSize(filesPathInfo.getLeft()))
+
.isEqualTo(sourceTable.fileIO().getFileSize(sourceTableFile));
+ }
+ }
+
+ private Path getPathExcludeTableRoot(Path absolutePath, Path
sourceTableRoot) {
+ String fileAbsolutePath = absolutePath.toUri().toString();
+ String sourceTableRootPath = sourceTableRoot.toString();
+
+ checkState(
+ fileAbsolutePath.startsWith(sourceTableRootPath),
+ "This is a bug, please report. fileAbsolutePath is : "
+ + fileAbsolutePath
+ + ", sourceTableRootPath is : "
+ + sourceTableRootPath);
+
+ return new
Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
+ }
+
+ private FileStoreTable getFileStoreTable(String warehouse, String db,
String tableName)
+ throws Exception {
+ try (Catalog catalog =
+ CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)))) {
+ return (FileStoreTable) catalog.getTable(Identifier.create(db,
tableName));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Random Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ @Timeout(180)
+ public void testCloneTableWithExpiration() throws Exception {
+ String sourceWarehouse = getTempDirPath("source-ware");
+
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(1).build();
+ tEnv.executeSql(
+ "CREATE CATALOG sourcecat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
sourceWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG sourcecat");
+ tEnv.executeSql(
+ "CREATE TABLE t (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'changelog-producer' = 'lookup',\n"
+ // very fast expiration
+ + " 'snapshot.num-retained.min' = '1',\n"
+ + " 'snapshot.num-retained.max' = '1',\n"
+ + " 'write-buffer-size' = '256 kb'\n"
+ + ")");
+
+ int numPartitions = 3;
+ int numKeysPerPartition = 10000;
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE s (\n"
+ + " a INT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'fields.a.kind' = 'sequence',\n"
+ + " 'fields.a.start' = '0',\n"
+ + String.format(
+ " 'fields.a.end' = '%d',\n",
+ numPartitions * numKeysPerPartition - 1)
+ + String.format(
+ " 'number-of-rows' = '%d'\n", numPartitions *
numKeysPerPartition)
+ + ")");
+
+ // write initial data
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO t SELECT (a / %d) AS pt, (a %%
%d) AS k, 0 AS v FROM s",
+ numKeysPerPartition, numKeysPerPartition))
+ .await();
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ Runnable runnable =
+ () -> {
+ int rounds = 1;
+ while (running.get()) {
+ String sql =
+ String.format(
+ "INSERT INTO t SELECT (a / %d) AS pt,
(a %% %d) AS k, %d AS v FROM s",
+ numKeysPerPartition,
numKeysPerPartition, rounds);
+ try {
+ tEnv.executeSql(sql).await();
+ // Sleeping time will become longer and longer, so
expiration time will
+ // also become longer.
+ // Thus, at the beginning of the test, clone job
is very likely to fail
+ // due to FileNotFoundException.
+ // However, as the test progresses further, clone
job should be able to
+ // complete due to longer expiration time.
+ Thread.sleep(100L << Math.min(rounds, 9));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ rounds++;
+ }
+ };
+ Thread thread = new Thread(runnable);
+ thread.start();
+
+ Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
+ String targetWarehouse = getTempDirPath("target-ware");
+ String[] args =
+ new String[] {
+ "clone",
+ "--warehouse",
+ // special file io to make cloning slower, thus more
likely to face
+ // FileNotFoundException, see CloneActionSlowFileIO
+ "clone-slow://" + sourceWarehouse,
+ "--target_warehouse",
+ "clone-slow://" + targetWarehouse,
+ "--parallelism",
+ "1"
+ };
+ CloneAction action = (CloneAction)
ActionFactory.createAction(args).get();
+
+ StreamExecutionEnvironment env =
+
streamExecutionEnvironmentBuilder().streamingMode().allowRestart().build();
+ action.withStreamExecutionEnvironment(env).build();
+ env.execute();
+ running.set(false);
+ thread.join();
+
+ // check result
+ tEnv.executeSql(
+ "CREATE CATALOG targetcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + String.format(" 'warehouse' = '%s'\n",
targetWarehouse)
+ + ")");
+ tEnv.executeSql("USE CATALOG targetcat");
+ assertThat(collect(tEnv, "SELECT pt, COUNT(*) FROM t GROUP BY pt ORDER
BY pt"))
+ .isEqualTo(
+ IntStream.range(0, numPartitions)
+ .mapToObj(i -> String.format("+I[%d, %d]", i,
numKeysPerPartition))
+ .collect(Collectors.toList()));
+ assertThat(collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t"))
+ .isEqualTo(Collections.singletonList("+I[1]"));
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ private List<String> collect(TableEnvironment tEnv, String sql) throws
Exception {
+ List<String> actual = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ actual.add(row.toString());
+ }
+ }
+ return actual;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
new file mode 100644
index 000000000..bc703699b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.PositionOutputStreamWrapper;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.SeekableInputStreamWrapper;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import java.io.IOException;
+
+/**
+ * Special {@link FileIO} for {@link CloneActionITCase}. It will sleep before
cloning a file, thus
+ * making it slower.
+ */
+public class CloneActionSlowFileIO extends LocalFileIO {
+
+ public static final String SCHEME = "clone-slow";
+ private static final int SLEEP_MILLIS = 150;
+
+ @Override
+ public SeekableInputStream newInputStream(Path f) throws IOException {
+ return new SlowSeekableInputStreamWrapper(super.newInputStream(f));
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path filePath, boolean
overwrite)
+ throws IOException {
+ return new
SlowPositionOutputStreamWrapper(super.newOutputStream(filePath, overwrite));
+ }
+
+ private static boolean checkStackTrace() {
+ for (StackTraceElement layer : Thread.currentThread().getStackTrace())
{
+ if (layer.getMethodName().contains("copyBytes")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static class SlowSeekableInputStreamWrapper extends
SeekableInputStreamWrapper {
+
+ public SlowSeekableInputStreamWrapper(SeekableInputStream inputStream)
{
+ super(inputStream);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (checkStackTrace()) {
+ try {
+ Thread.sleep(SLEEP_MILLIS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return super.read(b, off, len);
+ }
+ }
+
+ private static class SlowPositionOutputStreamWrapper extends
PositionOutputStreamWrapper {
+
+ public SlowPositionOutputStreamWrapper(PositionOutputStream
outputStream) {
+ super(outputStream);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (checkStackTrace()) {
+ try {
+ Thread.sleep(SLEEP_MILLIS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.write(b, off, len);
+ }
+ }
+
+ /** Loader for {@link CloneActionSlowFileIO}. */
+ public static class Loader implements FileIOLoader {
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ @Override
+ public FileIO load(Path path) {
+ return new CloneActionSlowFileIO();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 000000000..971a2f8c4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.flink.action.CloneActionSlowFileIO$Loader