This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 97aa22663 [flink] Support cloning table from latest snapshot (#3287)
97aa22663 is described below

commit 97aa2266394ccd0a248675e97f880c9e9ef29c54
Author: wangwj <[email protected]>
AuthorDate: Tue Jun 4 13:54:52 2024 +0800

    [flink] Support cloning table from latest snapshot (#3287)
    
    This closes #3287.
---
 docs/content/migration/clone-tables.md             |  82 ++++
 .../main/java/org/apache/paimon/schema/Schema.java |   9 +
 .../apache/paimon/flink/action/CloneAction.java    | 150 ++++++
 .../paimon/flink/action/CloneActionFactory.java    |  89 ++++
 .../apache/paimon/flink/clone/CloneFileInfo.java   |  53 ++
 .../paimon/flink/clone/CloneSourceBuilder.java     | 122 +++++
 .../paimon/flink/clone/CopyFileOperator.java       | 126 +++++
 .../flink/clone/PickFilesForCloneOperator.java     | 136 ++++++
 .../apache/paimon/flink/clone/PickFilesUtil.java   | 206 ++++++++
 .../flink/clone/SnapshotHintChannelComputer.java   |  49 ++
 .../paimon/flink/clone/SnapshotHintOperator.java   | 101 ++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../paimon/flink/action/CloneActionITCase.java     | 532 +++++++++++++++++++++
 .../paimon/flink/action/CloneActionSlowFileIO.java | 112 +++++
 .../services/org.apache.paimon.fs.FileIOLoader     |  16 +
 15 files changed, 1784 insertions(+)

diff --git a/docs/content/migration/clone-tables.md 
b/docs/content/migration/clone-tables.md
new file mode 100644
index 000000000..871baee6f
--- /dev/null
+++ b/docs/content/migration/clone-tables.md
@@ -0,0 +1,82 @@
+---
+title: "Clone Tables"
+weight: 3
+type: docs
+aliases:
+- /migration/clone-tables.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Clone Tables
+
+Paimon supports cloning tables for data migration.
+Currently, only table files used by the latest snapshot will be cloned.
+
+To clone a table, run the following command to submit a clone job.
+If the table you clone is not modified at the same time, it is recommended to 
submit a Flink batch job for better performance.
+However, if you want to clone the table while writing it at the same time, 
submit a Flink streaming job for automatic failure recovery.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    clone \
+    --warehouse <source-warehouse-path> \
+    [--database <source-database-name>] \
+    [--table <source-table-name>] \
+    [--catalog_conf <source-paimon-catalog-conf> [--catalog_conf 
<source-paimon-catalog-conf> ...]] \
+    --target_warehouse <target-warehouse-path> \
+    [--target_database <target-database>] \
+    [--target_table <target-table-name>] \
+    [--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf 
<target-paimon-catalog-conf> ...]]
+    [--parallelism <parallelism>]
+```
+
+{{< hint info >}}
+1. If `database` is not specified, all tables in all databases of the 
specified warehouse will be cloned.
+2. If `table` is not specified, all tables of the specified database will be 
cloned.
+{{< /hint >}}
+
+Example: Clone `test_db.test_table` from source warehouse to target warehouse.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    clone \
+    --warehouse s3:///path/to/warehouse_source \
+    --database test_db \
+    --table test_table \
+    --catalog_conf s3.endpoint=https://****.com \
+    --catalog_conf s3.access-key=***** \
+    --catalog_conf s3.secret-key=***** \
+    --target_warehouse s3:///path/to/warehouse_target \
+    --target_database test_db \
+    --target_table test_table \
+    --target_catalog_conf s3.endpoint=https://****.com \
+    --target_catalog_conf s3.access-key=***** \
+    --target_catalog_conf s3.secret-key=*****
+```
+
+For more usage of the clone action, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    clone --help
+```
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 824dff5c4..b75758374 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -329,4 +329,13 @@ public class Schema {
             return new Schema(columns, partitionKeys, primaryKeys, options, 
comment);
         }
     }
+
+    public static Schema fromTableSchema(TableSchema tableSchema) {
+        return new Schema(
+                tableSchema.fields(),
+                tableSchema.partitionKeys(),
+                tableSchema.primaryKeys(),
+                tableSchema.options(),
+                tableSchema.comment());
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
new file mode 100644
index 000000000..caa676661
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.clone.CloneFileInfo;
+import org.apache.paimon.flink.clone.CloneSourceBuilder;
+import org.apache.paimon.flink.clone.CopyFileOperator;
+import org.apache.paimon.flink.clone.PickFilesForCloneOperator;
+import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
+import org.apache.paimon.flink.clone.SnapshotHintOperator;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.options.CatalogOptions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.StringUtils.isBlank;
+
+/** The Latest Snapshot clone action for Flink. */
+public class CloneAction extends ActionBase {
+
+    private final int parallelism;
+
+    private Map<String, String> sourceCatalogConfig;
+    private final String database;
+    private final String tableName;
+
+    private Map<String, String> targetCatalogConfig;
+    private final String targetDatabase;
+    private final String targetTableName;
+
+    public CloneAction(
+            String warehouse,
+            String database,
+            String tableName,
+            Map<String, String> sourceCatalogConfig,
+            String targetWarehouse,
+            String targetDatabase,
+            String targetTableName,
+            Map<String, String> targetCatalogConfig,
+            String parallelismStr) {
+        super(warehouse, sourceCatalogConfig);
+
+        checkNotNull(warehouse, "warehouse must not be null.");
+        checkNotNull(targetWarehouse, "targetWarehouse must not be null.");
+
+        this.parallelism =
+                isBlank(parallelismStr) ? env.getParallelism() : 
Integer.parseInt(parallelismStr);
+
+        this.sourceCatalogConfig = new HashMap<>();
+        if (!sourceCatalogConfig.isEmpty()) {
+            this.sourceCatalogConfig = sourceCatalogConfig;
+        }
+        this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), 
warehouse);
+        this.database = database;
+        this.tableName = tableName;
+
+        this.targetCatalogConfig = new HashMap<>();
+        if (!targetCatalogConfig.isEmpty()) {
+            this.targetCatalogConfig = targetCatalogConfig;
+        }
+        this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), 
targetWarehouse);
+        this.targetDatabase = targetDatabase;
+        this.targetTableName = targetTableName;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Java API
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void build() {
+        try {
+            buildCloneFlinkJob(env);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws 
Exception {
+        DataStream<Tuple2<String, String>> cloneSource =
+                new CloneSourceBuilder(
+                                env,
+                                sourceCatalogConfig,
+                                database,
+                                tableName,
+                                targetDatabase,
+                                targetTableName)
+                        .build();
+
+        SingleOutputStreamOperator<CloneFileInfo> pickFilesForClone =
+                cloneSource
+                        .transform(
+                                "Pick Files",
+                                TypeInformation.of(CloneFileInfo.class),
+                                new PickFilesForCloneOperator(
+                                        sourceCatalogConfig, 
targetCatalogConfig))
+                        .forceNonParallel();
+
+        SingleOutputStreamOperator<CloneFileInfo> copyFiles =
+                pickFilesForClone
+                        .rebalance()
+                        .transform(
+                                "Copy Files",
+                                TypeInformation.of(CloneFileInfo.class),
+                                new CopyFileOperator(sourceCatalogConfig, 
targetCatalogConfig))
+                        .setParallelism(parallelism);
+
+        SingleOutputStreamOperator<CloneFileInfo> snapshotHintOperator =
+                FlinkStreamPartitioner.partition(
+                                copyFiles, new SnapshotHintChannelComputer(), 
parallelism)
+                        .transform(
+                                "Recreate Snapshot Hint",
+                                TypeInformation.of(CloneFileInfo.class),
+                                new SnapshotHintOperator(targetCatalogConfig))
+                        .setParallelism(parallelism);
+
+        snapshotHintOperator.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
+    }
+
+    @Override
+    public void run() throws Exception {
+        build();
+        execute("Clone job");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
new file mode 100644
index 000000000..db45c8508
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link CloneAction}. */
+public class CloneActionFactory implements ActionFactory {
+
+    private static final String IDENTIFIER = "clone";
+    private static final String PARALLELISM = "parallelism";
+    private static final String TARGET_WAREHOUSE = "target_warehouse";
+    private static final String TARGET_DATABASE = "target_database";
+    private static final String TARGET_TABLE = "target_table";
+    private static final String TARGET_CATALOG_CONF = "target_catalog_conf";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        CloneAction cloneAction =
+                new CloneAction(
+                        params.get(WAREHOUSE),
+                        params.get(DATABASE),
+                        params.get(TABLE),
+                        optionalConfigMap(params, CATALOG_CONF),
+                        params.get(TARGET_WAREHOUSE),
+                        params.get(TARGET_DATABASE),
+                        params.get(TARGET_TABLE),
+                        optionalConfigMap(params, TARGET_CATALOG_CONF),
+                        params.get(PARALLELISM));
+
+        return Optional.of(cloneAction);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"clone\" runs a batch job for clone the 
latest Snapshot.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  clone --warehouse <warehouse_path> "
+                        + "[--database <database_name>] "
+                        + "[--table <table_name>] "
+                        + "[--catalog_conf <source-paimon-catalog-conf> 
[--catalog_conf <source-paimon-catalog-conf> ...]] "
+                        + "--target_warehouse <target_warehouse_path> "
+                        + "[--target_database <target_database_name>] "
+                        + "[--target_table <target_table_name>] "
+                        + "[--target_catalog_conf <target-paimon-catalog-conf> 
[--target_catalog_conf <target-paimon-catalog-conf> ...]] "
+                        + "[--parallelism <parallelism>]");
+
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  clone --warehouse s3:///path1/from/warehouse "
+                        + "--database test_db "
+                        + "--table test_table "
+                        + "--catalog_conf s3.endpoint=https://****.com "
+                        + "--catalog_conf s3.access-key=***** "
+                        + "--catalog_conf s3.secret-key=***** "
+                        + "--target_warehouse s3:///path2/to/warehouse "
+                        + "--target_database test_db_copy "
+                        + "--target_table test_table_copy "
+                        + "--target_catalog_conf s3.endpoint=https://****.com "
+                        + "--target_catalog_conf s3.access-key=***** "
+                        + "--target_catalog_conf s3.secret-key=***** ");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
new file mode 100644
index 000000000..d91695841
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+/** The information of copy file. */
+public class CloneFileInfo {
+
+    private final String filePathExcludeTableRoot;
+    private final String sourceIdentifier;
+    private final String targetIdentifier;
+
+    public CloneFileInfo(
+            String filePathExcludeTableRoot, String sourceIdentifier, String 
targetIdentifier) {
+        this.filePathExcludeTableRoot = filePathExcludeTableRoot;
+        this.sourceIdentifier = sourceIdentifier;
+        this.targetIdentifier = targetIdentifier;
+    }
+
+    public String getFilePathExcludeTableRoot() {
+        return filePathExcludeTableRoot;
+    }
+
+    public String getSourceIdentifier() {
+        return sourceIdentifier;
+    }
+
+    public String getTargetIdentifier() {
+        return targetIdentifier;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }",
+                filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
new file mode 100644
index 000000000..db0452873
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Pick the tables to be cloned based on the user input parameters. The record 
type of the build
+ * DataStream is {@link Tuple2}. The left element is the identifier of source 
table and the right
+ * element is the identifier of target table.
+ */
+public class CloneSourceBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CloneSourceBuilder.class);
+
+    private final StreamExecutionEnvironment env;
+    private final Map<String, String> sourceCatalogConfig;
+    private final String database;
+    private final String tableName;
+    private final String targetDatabase;
+    private final String targetTableName;
+
+    public CloneSourceBuilder(
+            StreamExecutionEnvironment env,
+            Map<String, String> sourceCatalogConfig,
+            String database,
+            String tableName,
+            String targetDatabase,
+            String targetTableName) {
+        this.env = env;
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.database = database;
+        this.tableName = tableName;
+        this.targetDatabase = targetDatabase;
+        this.targetTableName = targetTableName;
+    }
+
+    public DataStream<Tuple2<String, String>> build() throws Exception {
+        try (Catalog sourceCatalog =
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig))) {
+            return build(sourceCatalog);
+        }
+    }
+
+    private DataStream<Tuple2<String, String>> build(Catalog sourceCatalog) 
throws Exception {
+        List<Tuple2<String, String>> result = new ArrayList<>();
+
+        if (database == null) {
+            checkArgument(
+                    StringUtils.isBlank(tableName),
+                    "tableName must be blank when database is null.");
+            checkArgument(
+                    StringUtils.isBlank(targetDatabase),
+                    "targetDatabase must be blank when clone all tables in a 
catalog.");
+            checkArgument(
+                    StringUtils.isBlank(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+            for (String db : sourceCatalog.listDatabases()) {
+                for (String table : sourceCatalog.listTables(db)) {
+                    String s = db + "." + table;
+                    result.add(new Tuple2<>(s, s));
+                }
+            }
+        } else if (tableName == null) {
+            checkArgument(
+                    !StringUtils.isBlank(targetDatabase),
+                    "targetDatabase must not be blank when clone all tables in 
a database.");
+            checkArgument(
+                    StringUtils.isBlank(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+            for (String table : sourceCatalog.listTables(database)) {
+                result.add(new Tuple2<>(database + "." + table, targetDatabase 
+ "." + table));
+            }
+        } else {
+            checkArgument(
+                    !StringUtils.isBlank(targetDatabase),
+                    "targetDatabase must not be blank when clone a table.");
+            checkArgument(
+                    !StringUtils.isBlank(targetTableName),
+                    "targetTableName must not be blank when clone a table.");
+            result.add(
+                    new Tuple2<>(
+                            database + "." + tableName, targetDatabase + "." + 
targetTableName));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("The clone identifiers of source table and target table 
are: {}", result);
+        }
+        return env.fromCollection(result).forceNonParallel().forward();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
new file mode 100644
index 000000000..5b320f4e9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.IOUtils;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** A Operator to copy files. */
+public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
+        implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CopyFileOperator.class);
+
+    private final Map<String, String> sourceCatalogConfig;
+    private final Map<String, String> targetCatalogConfig;
+
+    private AbstractCatalog sourceCatalog;
+    private AbstractCatalog targetCatalog;
+
+    public CopyFileOperator(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
+
+    @Override
+    public void open() throws Exception {
+        sourceCatalog =
+                (AbstractCatalog)
+                        FlinkCatalogFactory.createPaimonCatalog(
+                                Options.fromMap(sourceCatalogConfig));
+        targetCatalog =
+                (AbstractCatalog)
+                        FlinkCatalogFactory.createPaimonCatalog(
+                                Options.fromMap(targetCatalogConfig));
+    }
+
+    @Override
+    public void processElement(StreamRecord<CloneFileInfo> streamRecord) 
throws Exception {
+        CloneFileInfo cloneFileInfo = streamRecord.getValue();
+
+        FileIO sourceTableFileIO = sourceCatalog.fileIO();
+        FileIO targetTableFileIO = targetCatalog.fileIO();
+        Path sourceTableRootPath =
+                sourceCatalog.getDataTableLocation(
+                        
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+        Path targetTableRootPath =
+                targetCatalog.getDataTableLocation(
+                        
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+
+        String filePathExcludeTableRoot = 
cloneFileInfo.getFilePathExcludeTableRoot();
+        Path sourcePath = new Path(sourceTableRootPath + 
filePathExcludeTableRoot);
+        Path targetPath = new Path(targetTableRootPath + 
filePathExcludeTableRoot);
+
+        if (targetTableFileIO.exists(targetPath)
+                && targetTableFileIO.getFileSize(targetPath)
+                        == sourceTableFileIO.getFileSize(sourcePath)) {
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Skipping clone target file {} because it already 
exists and has the same size.",
+                        targetPath);
+            }
+
+            // We still send record to SnapshotHintOperator to avoid the 
following corner case:
+            //
+            // When cloning two tables under a catalog, after clone table A is 
completed,
+            // the job fails due to snapshot expiration when cloning table B.
+            // If we don't re-send file information of table A to 
SnapshotHintOperator,
+            // the snapshot hint file of A will not be created after the 
restart.
+            output.collect(streamRecord);
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Begin copy file from {} to {}.", sourcePath, 
targetPath);
+        }
+        IOUtils.copyBytes(
+                sourceTableFileIO.newInputStream(sourcePath),
+                targetTableFileIO.newOutputStream(targetPath, true));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("End copy file from {} to {}.", sourcePath, targetPath);
+        }
+
+        output.collect(streamRecord);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (sourceCatalog != null) {
+            sourceCatalog.close();
+        }
+        if (targetCatalog != null) {
+            targetCatalog.close();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
new file mode 100644
index 000000000..883d7b06a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Pick the files to be cloned of a table based on the input record. The 
record type it produce is
+ * CloneFileInfo that indicate the information of copy file.
+ */
+public class PickFilesForCloneOperator extends 
AbstractStreamOperator<CloneFileInfo>
+        implements OneInputStreamOperator<Tuple2<String, String>, 
CloneFileInfo> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PickFilesForCloneOperator.class);
+
+    private final Map<String, String> sourceCatalogConfig;
+    private final Map<String, String> targetCatalogConfig;
+
+    private Catalog sourceCatalog;
+    private Catalog targetCatalog;
+
+    public PickFilesForCloneOperator(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
+
+    @Override
+    public void open() throws Exception {
+        sourceCatalog =
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+        targetCatalog =
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+    }
+
+    @Override
+    public void processElement(StreamRecord<Tuple2<String, String>> 
streamRecord) throws Exception {
+        String sourceIdentifierStr = streamRecord.getValue().f0;
+        Identifier sourceIdentifier = 
Identifier.fromString(sourceIdentifierStr);
+        String targetIdentifierStr = streamRecord.getValue().f1;
+        Identifier targetIdentifier = 
Identifier.fromString(targetIdentifierStr);
+
+        FileStoreTable sourceTable = (FileStoreTable) 
sourceCatalog.getTable(sourceIdentifier);
+        targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
+        targetCatalog.createTable(
+                targetIdentifier, 
Schema.fromTableSchema(sourceTable.schema()), true);
+
+        List<CloneFileInfo> result =
+                toCloneFileInfos(
+                        
PickFilesUtil.getUsedFilesForLatestSnapshot(sourceTable),
+                        sourceTable.location(),
+                        sourceIdentifierStr,
+                        targetIdentifierStr);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("The CloneFileInfo of table {} is {} : ", 
sourceTable.location(), result);
+        }
+
+        for (CloneFileInfo info : result) {
+            output.collect(new StreamRecord<>(info));
+        }
+    }
+
+    private List<CloneFileInfo> toCloneFileInfos(
+            List<Path> files,
+            Path sourceTableRoot,
+            String sourceIdentifier,
+            String targetIdentifier) {
+        List<CloneFileInfo> result = new ArrayList<>();
+        for (Path file : files) {
+            Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
+            result.add(
+                    new CloneFileInfo(relativePath.toString(), 
sourceIdentifier, targetIdentifier));
+        }
+        return result;
+    }
+
+    private Path getPathExcludeTableRoot(Path absolutePath, Path 
sourceTableRoot) {
+        String fileAbsolutePath = absolutePath.toUri().toString();
+        String sourceTableRootPath = sourceTableRoot.toString();
+
+        Preconditions.checkState(
+                fileAbsolutePath.startsWith(sourceTableRootPath),
+                "File absolute path does not start with source table root 
path. This is unexpected. "
+                        + "fileAbsolutePath is: "
+                        + fileAbsolutePath
+                        + ", sourceTableRootPath is: "
+                        + sourceTableRootPath);
+
+        return new 
Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (sourceCatalog != null) {
+            sourceCatalog.close();
+        }
+        if (targetCatalog != null) {
+            targetCatalog.close();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
new file mode 100644
index 000000000..f83b5cf8f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Util class for get used files' paths of a table's latest snapshot. */
+public class PickFilesUtil {
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    public static List<Path> getUsedFilesForLatestSnapshot(FileStoreTable 
table) {
+        FileStore<?> store = table.store();
+        SnapshotManager snapshotManager = store.snapshotManager();
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        ManifestList manifestList = store.manifestListFactory().create();
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+        List<Path> files = new ArrayList<>();
+        if (snapshot != null) {
+            files.add(snapshotManager.snapshotPath(snapshot.id()));
+            files.addAll(
+                    getUsedFilesInternal(
+                            snapshot,
+                            store.pathFactory(),
+                            store.newScan(),
+                            manifestList,
+                            indexFileHandler));
+        }
+        for (long id : schemaManager.listAllIds()) {
+            files.add(schemaManager.toSchemaPath(id));
+        }
+        return files;
+    }
+
+    private static List<Path> getUsedFilesInternal(
+            Snapshot snapshot,
+            FileStorePathFactory pathFactory,
+            FileStoreScan scan,
+            ManifestList manifestList,
+            IndexFileHandler indexFileHandler) {
+        List<Path> files = new ArrayList<>();
+        addManifestList(files, snapshot, pathFactory);
+
+        try {
+            // try to read manifests
+            List<ManifestFileMeta> manifestFileMetas =
+                    retryReadingFiles(
+                            () -> readAllManifestsWithIOException(snapshot, 
manifestList));
+            if (manifestFileMetas == null) {
+                return Collections.emptyList();
+            }
+            List<String> manifestFileName =
+                    manifestFileMetas.stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            files.addAll(
+                    manifestFileName.stream()
+                            .map(pathFactory::toManifestFilePath)
+                            .collect(Collectors.toList()));
+
+            // try to read data files
+            List<Path> dataFiles = new ArrayList<>();
+            List<SimpleFileEntry> simpleFileEntries =
+                    scan.withSnapshot(snapshot).readSimpleEntries();
+            for (SimpleFileEntry simpleFileEntry : simpleFileEntries) {
+                Path dataFilePath =
+                        pathFactory
+                                .createDataFilePathFactory(
+                                        simpleFileEntry.partition(), 
simpleFileEntry.bucket())
+                                .toPath(simpleFileEntry.fileName());
+                dataFiles.add(dataFilePath);
+            }
+
+            // When scanning, dataFiles are listed from older to newer.
+            // By reversing dataFiles, newer files will be copied first.
+            //
+            // We do this because new files are from the latest partition, and 
are prone to be
+            // deleted. Older files however, are from previous partitions and 
should not be changed
+            // very often.
+            Collections.reverse(dataFiles);
+            files.addAll(dataFiles);
+
+            // try to read index files
+            String indexManifest = snapshot.indexManifest();
+            if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
+                
files.add(pathFactory.indexManifestFileFactory().toPath(indexManifest));
+
+                List<IndexManifestEntry> indexManifestEntries =
+                        retryReadingFiles(
+                                () -> 
indexFileHandler.readManifestWithIOException(indexManifest));
+                if (indexManifestEntries == null) {
+                    return Collections.emptyList();
+                }
+
+                indexManifestEntries.stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .map(indexFileHandler::filePath)
+                        .forEach(files::add);
+            }
+
+            // add statistic file
+            if (snapshot.statistics() != null) {
+                
files.add(pathFactory.statsFileFactory().toPath(snapshot.statistics()));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        return files;
+    }
+
+    private static void addManifestList(
+            List<Path> used, Snapshot snapshot, FileStorePathFactory 
pathFactory) {
+        used.add(pathFactory.toManifestListPath(snapshot.baseManifestList()));
+        used.add(pathFactory.toManifestListPath(snapshot.deltaManifestList()));
+        String changelogManifestList = snapshot.changelogManifestList();
+        if (changelogManifestList != null) {
+            used.add(pathFactory.toManifestListPath(changelogManifestList));
+        }
+    }
+
+    private static List<ManifestFileMeta> readAllManifestsWithIOException(
+            Snapshot snapshot, ManifestList manifestList) throws IOException {
+        List<ManifestFileMeta> result = new ArrayList<>();
+
+        
result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
+        
result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList()));
+
+        String changelogManifestList = snapshot.changelogManifestList();
+        if (changelogManifestList != null) {
+            
result.addAll(manifestList.readWithIOException(changelogManifestList));
+        }
+
+        return result;
+    }
+
+    @Nullable
+    private static <T> T retryReadingFiles(ReaderWithIOException<T> reader) 
throws IOException {
+        int retryNumber = 0;
+        IOException caught = null;
+        while (retryNumber++ < READ_FILE_RETRY_NUM) {
+            try {
+                return reader.read();
+            } catch (FileNotFoundException e) {
+                return null;
+            } catch (IOException e) {
+                caught = e;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+
+        throw caught;
+    }
+
+    /** A helper functional interface for method {@link #retryReadingFiles}. */
+    @FunctionalInterface
+    interface ReaderWithIOException<T> {
+        T read() throws IOException;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
new file mode 100644
index 000000000..c947ab1e6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+/**
+ * {@link ChannelComputer} for distributing {@link CloneFileInfo} records into 
SnapshotHintOperator
+ * for recreate snapshot hints.
+ *
+ * <p>{@link CloneFileInfo}s are distributed by target table name, so the 
creation of snapshot hints
+ * of each table is handled by only one parallelism.
+ */
+public class SnapshotHintChannelComputer implements 
ChannelComputer<CloneFileInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    private int numChannels;
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+    }
+
+    @Override
+    public int channel(CloneFileInfo record) {
+        int hsh = 0;
+        for (int i = 0; i < record.getTargetIdentifier().length(); i++) {
+            hsh = (hsh * 131 + record.getTargetIdentifier().charAt(i)) % 
numChannels;
+        }
+        return hsh;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
new file mode 100644
index 000000000..938119f94
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/** Create snapshot hint files after copying a table. */
+public class SnapshotHintOperator extends AbstractStreamOperator<CloneFileInfo>
+        implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo>, 
BoundedOneInput {
+
+    private final Map<String, String> targetCatalogConfig;
+
+    private Catalog targetCatalog;
+    private Set<String> identifiers;
+
+    public SnapshotHintOperator(Map<String, String> targetCatalogConfig) {
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
+
+    @Override
+    public void open() throws Exception {
+        targetCatalog =
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+        identifiers = new HashSet<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CloneFileInfo> streamRecord) 
throws Exception {
+        String identifier = streamRecord.getValue().getTargetIdentifier();
+        identifiers.add(identifier);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (String identifier : identifiers) {
+            FileStoreTable targetTable =
+                    (FileStoreTable) 
targetCatalog.getTable(Identifier.fromString(identifier));
+            commitSnapshotHintInTargetTable(targetTable.snapshotManager());
+        }
+    }
+
+    private void commitSnapshotHintInTargetTable(SnapshotManager 
targetTableSnapshotManager)
+            throws IOException {
+        OptionalLong optionalSnapshotId =
+                targetTableSnapshotManager.safelyGetAllSnapshots().stream()
+                        .mapToLong(Snapshot::id)
+                        .max();
+        if (optionalSnapshotId.isPresent()) {
+            long snapshotId = optionalSnapshotId.getAsLong();
+            targetTableSnapshotManager.commitEarliestHint(snapshotId);
+            targetTableSnapshotManager.commitLatestHint(snapshotId);
+            for (Snapshot snapshot : 
targetTableSnapshotManager.safelyGetAllSnapshots()) {
+                if (snapshot.id() != snapshotId) {
+                    targetTableSnapshotManager
+                            .fileIO()
+                            
.deleteQuietly(targetTableSnapshotManager.snapshotPath(snapshot.id()));
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (targetCatalog != null) {
+            targetCatalog.close();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 33a43009d..38cb36713 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 ### action factories
+org.apache.paimon.flink.action.CloneActionFactory
 org.apache.paimon.flink.action.CompactActionFactory
 org.apache.paimon.flink.action.CompactDatabaseActionFactory
 org.apache.paimon.flink.action.DropPartitionActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
new file mode 100644
index 000000000..74cb44cbe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.PickFilesUtil;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link CloneAction}. */
+public class CloneActionITCase extends AbstractTestBase {
+
+    // ------------------------------------------------------------------------
+    //  Constructed Tests
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testCloneTable() throws Exception {
+        String sourceWarehouse = getTempDirPath("source-ware");
+        prepareData(sourceWarehouse);
+
+        String targetWarehouse = getTempDirPath("target-ware");
+        String[] args =
+                new String[] {
+                    "clone",
+                    "--warehouse",
+                    sourceWarehouse,
+                    "--database",
+                    "db1",
+                    "--table",
+                    "t1",
+                    "--target_warehouse",
+                    targetWarehouse,
+                    "--target_database",
+                    "mydb",
+                    "--target_table",
+                    "myt"
+                };
+        ActionFactory.createAction(args).get().run();
+
+        // check result
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG targetcat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
targetWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG targetcat");
+
+        List<String> actual = collect(tEnv, "SELECT pt, k, v FROM mydb.myt 
ORDER BY pt, k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]", 
"+I[two, 2, 200]");
+        compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse, 
"mydb", "myt");
+    }
+
+    @Test
+    public void testCloneDatabase() throws Exception {
+        String sourceWarehouse = getTempDirPath("source-ware");
+        prepareData(sourceWarehouse);
+
+        String targetWarehouse = getTempDirPath("target-ware");
+        String[] args =
+                new String[] {
+                    "clone",
+                    "--warehouse",
+                    sourceWarehouse,
+                    "--database",
+                    "db1",
+                    "--target_warehouse",
+                    targetWarehouse,
+                    "--target_database",
+                    "mydb"
+                };
+        ActionFactory.createAction(args).get().run();
+
+        // check result
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG targetcat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
targetWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG targetcat");
+
+        List<String> actual = collect(tEnv, "SELECT pt, k, v FROM mydb.t1 
ORDER BY pt, k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]", 
"+I[two, 2, 200]");
+        compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse, 
"mydb", "t1");
+
+        actual = collect(tEnv, "SELECT k, v FROM mydb.t2 ORDER BY k");
+        assertThat(actual)
+                .containsExactly("+I[10, 100]", "+I[20, 201]", "+I[100, 
1001]", "+I[200, 2000]");
+        compareCloneFiles(sourceWarehouse, "db1", "t2", targetWarehouse, 
"mydb", "t2");
+    }
+
+    @Test
+    public void testCloneWarehouse() throws Exception {
+        String sourceWarehouse = getTempDirPath("source-ware");
+        prepareData(sourceWarehouse);
+
+        String targetWarehouse = getTempDirPath("target-ware");
+        String[] args =
+                new String[] {
+                    "clone", "--warehouse", sourceWarehouse, 
"--target_warehouse", targetWarehouse
+                };
+        ActionFactory.createAction(args).get().run();
+
+        // check result
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG targetcat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
targetWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG targetcat");
+
+        List<String> actual = collect(tEnv, "SELECT pt, k, v FROM db1.t1 ORDER 
BY pt, k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]", 
"+I[two, 2, 200]");
+        compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse, 
"db1", "t1");
+
+        actual = collect(tEnv, "SELECT k, v FROM db1.t2 ORDER BY k");
+        assertThat(actual)
+                .containsExactly("+I[10, 100]", "+I[20, 201]", "+I[100, 
1001]", "+I[200, 2000]");
+        compareCloneFiles(sourceWarehouse, "db1", "t2", targetWarehouse, 
"db1", "t2");
+
+        actual = collect(tEnv, "SELECT pt, k, v FROM db2.t3 ORDER BY pt, k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[1, 1, one]",
+                        "+I[1, 2, twenty]",
+                        "+I[2, 1, banana]",
+                        "+I[2, 2, orange]");
+        compareCloneFiles(sourceWarehouse, "db2", "t3", targetWarehouse, 
"db2", "t3");
+
+        actual = collect(tEnv, "SELECT k, v FROM db2.t4 ORDER BY k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[10, one]", "+I[20, twenty]", "+I[100, banana]", 
"+I[200, orange]");
+        compareCloneFiles(sourceWarehouse, "db2", "t4", targetWarehouse, 
"db2", "t4");
+    }
+
+    private void prepareData(String sourceWarehouse) throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG sourcecat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
sourceWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG sourcecat");
+
+        tEnv.executeSql("CREATE DATABASE db1");
+        tEnv.executeSql("CREATE DATABASE db2");
+
+        // prepare data: db1.t1
+        tEnv.executeSql(
+                "CREATE TABLE db1.t1 (\n"
+                        + "  pt STRING,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO db1.t1 VALUES "
+                                + "('one', 1, 10), "
+                                + "('one', 2, 20), "
+                                + "('two', 1, 100)")
+                .await();
+        tEnv.executeSql(
+                        "INSERT INTO db1.t1 VALUES "
+                                + "('one', 2, 21), "
+                                + "('two', 1, 101), "
+                                + "('two', 2, 200)")
+                .await();
+
+        // prepare data: db1.t2
+        tEnv.executeSql(
+                "CREATE TABLE db1.t2 (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO db1.t2 VALUES "
+                                + "(10, 100), "
+                                + "(20, 200), "
+                                + "(100, 1000)")
+                .await();
+        tEnv.executeSql(
+                        "INSERT INTO db1.t2 VALUES "
+                                + "(20, 201), "
+                                + "(100, 1001), "
+                                + "(200, 2000)")
+                .await();
+
+        // prepare data: db2.t3
+        tEnv.executeSql(
+                "CREATE TABLE db2.t3 (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO db2.t3 VALUES "
+                                + "(1, 1, 'one'), "
+                                + "(1, 2, 'two'), "
+                                + "(2, 1, 'apple')")
+                .await();
+        tEnv.executeSql(
+                        "INSERT INTO db2.t3 VALUES "
+                                + "(1, 2, 'twenty'), "
+                                + "(2, 1, 'banana'), "
+                                + "(2, 2, 'orange')")
+                .await();
+
+        // prepare data: db2.t4
+        tEnv.executeSql(
+                "CREATE TABLE db2.t4 (\n"
+                        + "  k INT,\n"
+                        + "  v STRING,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO db2.t4 VALUES "
+                                + "(10, 'one'), "
+                                + "(20, 'two'), "
+                                + "(100, 'apple')")
+                .await();
+        tEnv.executeSql(
+                        "INSERT INTO db2.t4 VALUES "
+                                + "(20, 'twenty'), "
+                                + "(100, 'banana'), "
+                                + "(200, 'orange')")
+                .await();
+    }
+
+    @Test
+    public void testCloneWithSchemaEvolution() throws Exception {
+        String sourceWarehouse = getTempDirPath("source-ware");
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG sourcecat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
sourceWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG sourcecat");
+
+        tEnv.executeSql(
+                "CREATE TABLE t (\n"
+                        + "  pt STRING,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'changelog-producer' = 'lookup'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO t VALUES "
+                                + "('one', 1, 10), "
+                                + "('one', 2, 20), "
+                                + "('two', 1, 100)")
+                .await();
+        tEnv.executeSql("ALTER TABLE t ADD v2 STRING AFTER v");
+        tEnv.executeSql(
+                        "INSERT INTO t VALUES "
+                                + "('one', 2, 21, 'apple'), "
+                                + "('two', 1, 101, 'banana'), "
+                                + "('two', 2, 200, 'orange')")
+                .await();
+
+        String targetWarehouse = getTempDirPath("target-ware");
+        String[] args =
+                new String[] {
+                    "clone", "--warehouse", sourceWarehouse, 
"--target_warehouse", targetWarehouse
+                };
+        ActionFactory.createAction(args).get().run();
+
+        // check result
+        tEnv.executeSql(
+                "CREATE CATALOG targetcat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
targetWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG targetcat");
+
+        List<String> actual = collect(tEnv, "SELECT pt, k, v, v2 FROM t ORDER 
BY pt, k");
+        assertThat(actual)
+                .containsExactly(
+                        "+I[one, 1, 10, null]",
+                        "+I[one, 2, 21, apple]",
+                        "+I[two, 1, 101, banana]",
+                        "+I[two, 2, 200, orange]");
+        compareCloneFiles(sourceWarehouse, "default", "t", targetWarehouse, 
"default", "t");
+    }
+
+    private void compareCloneFiles(
+            String sourceWarehouse,
+            String sourceDb,
+            String sourceTableName,
+            String targetWarehouse,
+            String targetDb,
+            String targetTableName)
+            throws Exception {
+        FileStoreTable targetTable = getFileStoreTable(targetWarehouse, 
targetDb, targetTableName);
+        List<Path> targetTableFiles = 
PickFilesUtil.getUsedFilesForLatestSnapshot(targetTable);
+        List<Pair<Path, Path>> filesPathInfoList =
+                targetTableFiles.stream()
+                        .map(
+                                absolutePath ->
+                                        Pair.of(
+                                                absolutePath,
+                                                getPathExcludeTableRoot(
+                                                        absolutePath, 
targetTable.location())))
+                        .collect(Collectors.toList());
+
+        FileStoreTable sourceTable = getFileStoreTable(sourceWarehouse, 
sourceDb, sourceTableName);
+        Path tableLocation = sourceTable.location();
+        for (Pair<Path, Path> filesPathInfo : filesPathInfoList) {
+            Path sourceTableFile = new Path(tableLocation.toString() + 
filesPathInfo.getRight());
+            assertThat(sourceTable.fileIO().exists(sourceTableFile)).isTrue();
+            
assertThat(targetTable.fileIO().getFileSize(filesPathInfo.getLeft()))
+                    
.isEqualTo(sourceTable.fileIO().getFileSize(sourceTableFile));
+        }
+    }
+
+    private Path getPathExcludeTableRoot(Path absolutePath, Path 
sourceTableRoot) {
+        String fileAbsolutePath = absolutePath.toUri().toString();
+        String sourceTableRootPath = sourceTableRoot.toString();
+
+        checkState(
+                fileAbsolutePath.startsWith(sourceTableRootPath),
+                "This is a bug, please report. fileAbsolutePath is : "
+                        + fileAbsolutePath
+                        + ", sourceTableRootPath is : "
+                        + sourceTableRootPath);
+
+        return new 
Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
+    }
+
+    private FileStoreTable getFileStoreTable(String warehouse, String db, 
String tableName)
+            throws Exception {
+        try (Catalog catalog =
+                CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)))) {
+            return (FileStoreTable) catalog.getTable(Identifier.create(db, 
tableName));
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Random Tests
+    // ------------------------------------------------------------------------
+
+    @Test
+    @Timeout(180)
+    public void testCloneTableWithExpiration() throws Exception {
+        String sourceWarehouse = getTempDirPath("source-ware");
+
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(1).build();
+        tEnv.executeSql(
+                "CREATE CATALOG sourcecat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
sourceWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG sourcecat");
+        tEnv.executeSql(
+                "CREATE TABLE t (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'changelog-producer' = 'lookup',\n"
+                        // very fast expiration
+                        + "  'snapshot.num-retained.min' = '1',\n"
+                        + "  'snapshot.num-retained.max' = '1',\n"
+                        + "  'write-buffer-size' = '256 kb'\n"
+                        + ")");
+
+        int numPartitions = 3;
+        int numKeysPerPartition = 10000;
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE s (\n"
+                        + "  a INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'datagen',\n"
+                        + "  'fields.a.kind' = 'sequence',\n"
+                        + "  'fields.a.start' = '0',\n"
+                        + String.format(
+                                "  'fields.a.end' = '%d',\n",
+                                numPartitions * numKeysPerPartition - 1)
+                        + String.format(
+                                "  'number-of-rows' = '%d'\n", numPartitions * 
numKeysPerPartition)
+                        + ")");
+
+        // write initial data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO t SELECT (a / %d) AS pt, (a %% 
%d) AS k, 0 AS v FROM s",
+                                numKeysPerPartition, numKeysPerPartition))
+                .await();
+
+        AtomicBoolean running = new AtomicBoolean(true);
+        Runnable runnable =
+                () -> {
+                    int rounds = 1;
+                    while (running.get()) {
+                        String sql =
+                                String.format(
+                                        "INSERT INTO t SELECT (a / %d) AS pt, 
(a %% %d) AS k, %d AS v FROM s",
+                                        numKeysPerPartition, 
numKeysPerPartition, rounds);
+                        try {
+                            tEnv.executeSql(sql).await();
+                            // Sleeping time will become longer and longer, so 
expiration time will
+                            // also become longer.
+                            // Thus, at the beginning of the test, clone job 
is very likely to fail
+                            // due to FileNotFoundException.
+                            // However, as the test progresses further, clone 
job should be able to
+                            // complete due to longer expiration time.
+                            Thread.sleep(100L << Math.min(rounds, 9));
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                        rounds++;
+                    }
+                };
+        Thread thread = new Thread(runnable);
+        thread.start();
+
+        Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
+        String targetWarehouse = getTempDirPath("target-ware");
+        String[] args =
+                new String[] {
+                    "clone",
+                    "--warehouse",
+                    // special file io to make cloning slower, thus more 
likely to face
+                    // FileNotFoundException, see CloneActionSlowFileIO
+                    "clone-slow://" + sourceWarehouse,
+                    "--target_warehouse",
+                    "clone-slow://" + targetWarehouse,
+                    "--parallelism",
+                    "1"
+                };
+        CloneAction action = (CloneAction) 
ActionFactory.createAction(args).get();
+
+        StreamExecutionEnvironment env =
+                
streamExecutionEnvironmentBuilder().streamingMode().allowRestart().build();
+        action.withStreamExecutionEnvironment(env).build();
+        env.execute();
+        running.set(false);
+        thread.join();
+
+        // check result
+        tEnv.executeSql(
+                "CREATE CATALOG targetcat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + String.format("  'warehouse' = '%s'\n", 
targetWarehouse)
+                        + ")");
+        tEnv.executeSql("USE CATALOG targetcat");
+        assertThat(collect(tEnv, "SELECT pt, COUNT(*) FROM t GROUP BY pt ORDER 
BY pt"))
+                .isEqualTo(
+                        IntStream.range(0, numPartitions)
+                                .mapToObj(i -> String.format("+I[%d, %d]", i, 
numKeysPerPartition))
+                                .collect(Collectors.toList()));
+        assertThat(collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t"))
+                .isEqualTo(Collections.singletonList("+I[1]"));
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utils
+    // ------------------------------------------------------------------------
+
+    private List<String> collect(TableEnvironment tEnv, String sql) throws 
Exception {
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                Row row = it.next();
+                actual.add(row.toString());
+            }
+        }
+        return actual;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
new file mode 100644
index 000000000..bc703699b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionSlowFileIO.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.PositionOutputStreamWrapper;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.SeekableInputStreamWrapper;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import java.io.IOException;
+
+/**
+ * Special {@link FileIO} for {@link CloneActionITCase}. It will sleep before 
cloning a file, thus
+ * making it slower.
+ */
+public class CloneActionSlowFileIO extends LocalFileIO {
+
+    public static final String SCHEME = "clone-slow";
+    private static final int SLEEP_MILLIS = 150;
+
+    @Override
+    public SeekableInputStream newInputStream(Path f) throws IOException {
+        return new SlowSeekableInputStreamWrapper(super.newInputStream(f));
+    }
+
+    @Override
+    public PositionOutputStream newOutputStream(Path filePath, boolean 
overwrite)
+            throws IOException {
+        return new 
SlowPositionOutputStreamWrapper(super.newOutputStream(filePath, overwrite));
+    }
+
+    private static boolean checkStackTrace() {
+        for (StackTraceElement layer : Thread.currentThread().getStackTrace()) 
{
+            if (layer.getMethodName().contains("copyBytes")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static class SlowSeekableInputStreamWrapper extends 
SeekableInputStreamWrapper {
+
+        public SlowSeekableInputStreamWrapper(SeekableInputStream inputStream) 
{
+            super(inputStream);
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            if (checkStackTrace()) {
+                try {
+                    Thread.sleep(SLEEP_MILLIS);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return super.read(b, off, len);
+        }
+    }
+
+    private static class SlowPositionOutputStreamWrapper extends 
PositionOutputStreamWrapper {
+
+        public SlowPositionOutputStreamWrapper(PositionOutputStream 
outputStream) {
+            super(outputStream);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            if (checkStackTrace()) {
+                try {
+                    Thread.sleep(SLEEP_MILLIS);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            super.write(b, off, len);
+        }
+    }
+
+    /** Loader for {@link CloneActionSlowFileIO}. */
+    public static class Loader implements FileIOLoader {
+
+        @Override
+        public String getScheme() {
+            return SCHEME;
+        }
+
+        @Override
+        public FileIO load(Path path) {
+            return new CloneActionSlowFileIO();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 000000000..971a2f8c4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.flink.action.CloneActionSlowFileIO$Loader

Reply via email to