This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 183ca8f5de [iceberg] support migration for iceberg hive-catalog and 
introduce flink procedure and action (#4878)
183ca8f5de is described below

commit 183ca8f5de04d03db3fee133926a76f092282446
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Feb 13 13:51:01 2025 +0800

    [iceberg] support migration for iceberg hive-catalog and introduce flink 
procedure and action (#4878)
    
    This closes #4878.
---
 ...dure.java => MigrateIcebergTableProcedure.java} |  59 +++---
 .../flink/procedure/MigrateTableProcedure.java     |  40 ++--
 .../flink/action/MigrateIcebergTableAction.java    |  61 ++++++
 .../action/MigrateIcebergTableActionFactory.java   |  70 +++++++
 ...dure.java => MigrateIcebergTableProcedure.java} |  31 +--
 .../flink/procedure/MigrateTableProcedure.java     |  12 +-
 .../paimon/flink/utils/TableMigrationUtils.java    |  23 +++
 .../services/org.apache.paimon.factories.Factory   |   3 +-
 .../migrate/IcebergMigrateHiveMetadata.java        | 168 +++++++++++++++
 .../migrate/IcebergMigrateHiveMetadataFactory.java |  36 ++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 paimon-hive/paimon-hive-connector-common/pom.xml   |  19 ++
 .../MigrateIcebergTableProcedureITCase.java        | 229 +++++++++++++++++++++
 paimon-hive/pom.xml                                |   1 +
 14 files changed, 675 insertions(+), 78 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
similarity index 57%
copy from 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
copy to 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
index 196528d31c..0402d3e896 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
@@ -20,82 +20,73 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Migrate procedure to migrate hive table to paimon table. */
-public class MigrateTableProcedure extends ProcedureBase {
+/** Migrate procedure to migrate iceberg table to paimon table. */
+public class MigrateIcebergTableProcedure extends ProcedureBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MigrateTableProcedure.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);
 
     private static final String PAIMON_SUFFIX = "_paimon_";
 
     @Override
     public String identifier() {
-        return "migrate_table";
+        return "migrate_iceberg_table";
     }
 
     public String[] call(
-            ProcedureContext procedureContext, String connector, String 
sourceTablePath)
+            ProcedureContext procedureContext, String sourceTablePath, String 
icebergProperties)
             throws Exception {
-        return call(procedureContext, connector, sourceTablePath, "");
+
+        return call(procedureContext, sourceTablePath, icebergProperties, "");
     }
 
     public String[] call(
             ProcedureContext procedureContext,
-            String connector,
             String sourceTablePath,
+            String icebergProperties,
             String properties)
             throws Exception {
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
-        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
-
-        TableMigrationUtils.getImporter(
-                        connector,
-                        catalog,
-                        sourceTableId.getDatabaseName(),
-                        sourceTableId.getObjectName(),
-                        targetTableId.getDatabaseName(),
-                        targetTableId.getObjectName(),
-                        Runtime.getRuntime().availableProcessors(),
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
 
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
-        return new String[] {"Success"};
+        return call(
+                procedureContext,
+                sourceTablePath,
+                icebergProperties,
+                properties,
+                Runtime.getRuntime().availableProcessors());
     }
 
     public String[] call(
             ProcedureContext procedureContext,
-            String connector,
             String sourceTablePath,
+            String icebergProperties,
             String properties,
             Integer parallelism)
             throws Exception {
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+        String targetTablePath = sourceTablePath + PAIMON_SUFFIX;
 
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+        Identifier targetTableId = Identifier.fromString(targetTablePath);
 
-        TableMigrationUtils.getImporter(
-                        connector,
+        Migrator migrator =
+                TableMigrationUtils.getIcebergImporter(
                         catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         parallelism,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties),
+                        
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
+        LOG.info("create migrator success.");
+        migrator.executeMigrate();
 
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
+        migrator.renameTable(false);
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 196528d31c..8778b9d5e1 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -50,25 +51,13 @@ public class MigrateTableProcedure extends ProcedureBase {
             String sourceTablePath,
             String properties)
             throws Exception {
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
 
-        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
-
-        TableMigrationUtils.getImporter(
-                        connector,
-                        catalog,
-                        sourceTableId.getDatabaseName(),
-                        sourceTableId.getObjectName(),
-                        targetTableId.getDatabaseName(),
-                        targetTableId.getObjectName(),
-                        Runtime.getRuntime().availableProcessors(),
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
-
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
-        return new String[] {"Success"};
+        return call(
+                procedureContext,
+                connector,
+                sourceTablePath,
+                properties,
+                Runtime.getRuntime().availableProcessors());
     }
 
     public String[] call(
@@ -78,12 +67,13 @@ public class MigrateTableProcedure extends ProcedureBase {
             String properties,
             Integer parallelism)
             throws Exception {
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+        String targetTablePath = sourceTablePath + PAIMON_SUFFIX;
 
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+        Identifier targetTableId = Identifier.fromString(targetTablePath);
 
-        TableMigrationUtils.getImporter(
+        Migrator migrator =
+                TableMigrationUtils.getImporter(
                         connector,
                         catalog,
                         sourceTableId.getDatabaseName(),
@@ -91,11 +81,11 @@ public class MigrateTableProcedure extends ProcedureBase {
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         parallelism,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+        LOG.info("create migrator success.");
+        migrator.executeMigrate();
 
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
+        migrator.renameTable(false);
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
new file mode 100644
index 0000000000..1b9fcb46a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from iceberg table to paimon table. */
+public class MigrateIcebergTableAction extends ActionBase {
+
+    private final String sourceTableFullName;
+    private final String tableProperties;
+    private final Integer parallelism;
+
+    private final String icebergProperties;
+
+    public MigrateIcebergTableAction(
+            String sourceTableFullName,
+            Map<String, String> catalogConfig,
+            String icebergProperties,
+            String tableProperties,
+            Integer parallelism) {
+        super(catalogConfig);
+        this.sourceTableFullName = sourceTableFullName;
+        this.tableProperties = tableProperties;
+        this.parallelism = parallelism;
+        this.icebergProperties = icebergProperties;
+    }
+
+    @Override
+    public void run() throws Exception {
+        MigrateIcebergTableProcedure migrateIcebergTableProcedure =
+                new MigrateIcebergTableProcedure();
+        migrateIcebergTableProcedure.withCatalog(catalog);
+        migrateIcebergTableProcedure.call(
+                new DefaultProcedureContext(env),
+                sourceTableFullName,
+                icebergProperties,
+                tableProperties,
+                parallelism);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
new file mode 100644
index 0000000000..c85559d66b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateIcebergTableAction}. */
+public class MigrateIcebergTableActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "migrate_iceberg_table";
+
+    private static final String OPTIONS = "options";
+    private static final String PARALLELISM = "parallelism";
+
+    private static final String ICEBERG_OPTIONS = "iceberg_options";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+
+        String sourceTable = params.get(TABLE);
+        Map<String, String> catalogConfig = catalogConfigMap(params);
+        String tableConf = params.get(OPTIONS);
+        Integer parallelism =
+                params.get(PARALLELISM) == null ? null : 
Integer.parseInt(params.get(PARALLELISM));
+
+        String icebergOptions = params.get(ICEBERG_OPTIONS);
+
+        MigrateIcebergTableAction migrateIcebergTableAction =
+                new MigrateIcebergTableAction(
+                        sourceTable, catalogConfig, icebergOptions, tableConf, 
parallelism);
+        return Optional.of(migrateIcebergTableAction);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"migrate_iceberg_table\" runs a migrating job from 
iceberg to paimon.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  migrate_iceberg_table"
+                        + "--table <database.table_name> "
+                        + "--iceberg_options <key>=<value>[,<key>=<value>,...]"
+                        + "[--catalog_conf <key>=<value] "
+                        + "[--options <key>=<value>,<key>=<value>,...]");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
similarity index 77%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
index fff05a1a85..f43d29ed4f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
@@ -29,22 +30,24 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Migrate procedure to migrate hive table to paimon table. */
-public class MigrateTableProcedure extends ProcedureBase {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MigrateTableProcedure.class);
+/** Migrate procedure to migrate iceberg table to paimon table. */
+public class MigrateIcebergTableProcedure extends ProcedureBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);
 
     private static final String PAIMON_SUFFIX = "_paimon_";
 
     @Override
     public String identifier() {
-        return "migrate_table";
+        return "migrate_iceberg_table";
     }
 
     @ProcedureHint(
             argument = {
-                @ArgumentHint(name = "connector", type = 
@DataTypeHint("STRING")),
                 @ArgumentHint(name = "source_table", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "iceberg_options",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
                 @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true),
                 @ArgumentHint(
                         name = "parallelism",
@@ -53,12 +56,13 @@ public class MigrateTableProcedure extends ProcedureBase {
             })
     public String[] call(
             ProcedureContext procedureContext,
-            String connector,
             String sourceTablePath,
+            String icebergProperties,
             String properties,
             Integer parallelism)
             throws Exception {
         properties = notnull(properties);
+        icebergProperties = notnull(icebergProperties);
 
         String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
 
@@ -67,19 +71,20 @@ public class MigrateTableProcedure extends ProcedureBase {
 
         Integer p = parallelism == null ? 
Runtime.getRuntime().availableProcessors() : parallelism;
 
-        TableMigrationUtils.getImporter(
-                        connector,
+        Migrator migrator =
+                TableMigrationUtils.getIcebergImporter(
                         catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         p,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties),
+                        
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
+        LOG.info("create migrator success.");
+        migrator.executeMigrate();
 
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
+        migrator.renameTable(false);
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index fff05a1a85..32a2a16dc5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
@@ -67,7 +68,8 @@ public class MigrateTableProcedure extends ProcedureBase {
 
         Integer p = parallelism == null ? 
Runtime.getRuntime().availableProcessors() : parallelism;
 
-        TableMigrationUtils.getImporter(
+        Migrator migrator =
+                TableMigrationUtils.getImporter(
                         connector,
                         catalog,
                         sourceTableId.getDatabaseName(),
@@ -75,11 +77,11 @@ public class MigrateTableProcedure extends ProcedureBase {
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         p,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+        LOG.info("create migrator success.");
+        migrator.executeMigrate();
 
-        LOG.info("Last step: rename " + targetTableId + " to " + 
sourceTableId);
-        catalog.renameTable(targetTableId, sourceTableId, false);
+        migrator.renameTable(false);
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index b59c3592a9..4e7268c6f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -22,7 +22,9 @@ import org.apache.paimon.catalog.CachingCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.migrate.HiveMigrator;
+import org.apache.paimon.iceberg.migrate.IcebergMigrator;
 import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.options.Options;
 
 import java.util.List;
 import java.util.Map;
@@ -60,6 +62,27 @@ public class TableMigrationUtils {
         }
     }
 
+    public static Migrator getIcebergImporter(
+            Catalog catalog,
+            String sourceDatabase,
+            String sourceTableName,
+            String targetDatabase,
+            String targetTableName,
+            Integer parallelism,
+            Map<String, String> options,
+            Map<String, String> icebergOptions) {
+
+        Options icebergConf = new Options(icebergOptions);
+        return new IcebergMigrator(
+                catalog,
+                targetDatabase,
+                targetTableName,
+                sourceDatabase,
+                sourceTableName,
+                icebergConf,
+                parallelism);
+    }
+
     public static List<Migrator> getImporters(
             String connector,
             Catalog catalog,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6f6becf85f..efaa25627d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure
 org.apache.paimon.flink.procedure.CompactManifestProcedure
 org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
 org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
-org.apache.paimon.flink.procedure.ClearConsumersProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.ClearConsumersProcedure
+org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
new file mode 100644
index 0000000000..3c0d7da024
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Get iceberg table latest snapshot metadata in hive. */
+public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class);
+
+    public static final String TABLE_TYPE_PROP = "table_type";
+    public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
+    private static final String ICEBERG_METADATA_LOCATION = 
"metadata_location";
+
+    private FileIO fileIO;
+    private final Options icebergOptions;
+    private final Identifier icebergIdentifier;
+
+    private final ClientPool<IMetaStoreClient, TException> clients;
+
+    private String metadataLocation = null;
+
+    private IcebergMetadata icebergMetadata;
+
+    public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options 
icebergOptions) {
+
+        this.icebergIdentifier = icebergIdentifier;
+        this.icebergOptions = icebergOptions;
+
+        String uri = icebergOptions.get(IcebergOptions.URI);
+        String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR);
+        String hadoopConfDir = 
icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR);
+        Configuration hadoopConf = new Configuration();
+        
hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader());
+        HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, 
hadoopConfDir, hadoopConf);
+
+        icebergOptions.toMap().forEach(hiveConf::set);
+        if (uri != null) {
+            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        }
+
+        if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) {
+            LOG.error(
+                    "Can't find hive metastore uri to connect: "
+                            + "either set {} in iceberg options or set 
hive.metastore.uris "
+                            + "in hive-site.xml or hadoop configurations. "
+                            + "Will use empty metastore uris, which means we 
may use a embedded metastore. "
+                            + "Please make sure hive metastore uri for iceberg 
table is correctly set as expected.",
+                    IcebergOptions.URI.key());
+        }
+
+        this.clients =
+                new CachedClientPool(
+                        hiveConf,
+                        icebergOptions,
+                        
icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS));
+    }
+
+    @Override
+    public IcebergMetadata icebergMetadata() {
+        try {
+            boolean isExist = tableExists(icebergIdentifier);
+            if (!isExist) {
+                throw new RuntimeException(
+                        String.format(
+                                "iceberg table %s is not existed in hive 
metastore",
+                                icebergIdentifier));
+            }
+            Table icebergHiveTable =
+                    clients.run(
+                            client ->
+                                    client.getTable(
+                                            
icebergIdentifier.getDatabaseName(),
+                                            icebergIdentifier.getTableName()));
+            // check whether it is an iceberg table
+            String tableType = 
icebergHiveTable.getParameters().get(TABLE_TYPE_PROP);
+            Preconditions.checkArgument(
+                    tableType != null && 
tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
+                    "not an iceberg table: %s (table-type=%s)",
+                    icebergIdentifier.toString(),
+                    tableType);
+
+            metadataLocation = 
icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION);
+            LOG.info("iceberg latest metadata location: {}", metadataLocation);
+
+            fileIO = FileIO.get(new Path(metadataLocation), 
CatalogContext.create(icebergOptions));
+
+            icebergMetadata = IcebergMetadata.fromPath(fileIO, new 
Path(metadataLocation));
+            return icebergMetadata;
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Failed to read Iceberg metadata from path 
%s", metadataLocation),
+                    e);
+        }
+    }
+
+    @Override
+    public String icebergLatestMetadataLocation() {
+        return metadataLocation;
+    }
+
+    @Override
+    public void deleteOriginTable() {
+        LOG.info("Iceberg table in hive to be deleted:{}", 
icebergIdentifier.toString());
+        try {
+            clients.run(
+                    client -> {
+                        client.dropTable(
+                                icebergIdentifier.getDatabaseName(),
+                                icebergIdentifier.getTableName(),
+                                true,
+                                true);
+                        return null;
+                    });
+
+            // iceberg table in hive is external table, client.dropTable only 
deletes the metadata
+            // of iceberg table in hive, so we manually delete the data files
+            Path icebergTablePath = new Path(icebergMetadata.location());
+
+            if (fileIO.exists(icebergTablePath) && 
fileIO.isDir(icebergTablePath)) {
+                fileIO.deleteDirectoryQuietly(icebergTablePath);
+            }
+        } catch (Exception e) {
+            LOG.warn("exception occurred when deleting origin table", e);
+        }
+    }
+
+    private boolean tableExists(Identifier identifier) throws Exception {
+        return clients.run(
+                client ->
+                        client.tableExists(
+                                identifier.getDatabaseName(), 
identifier.getTableName()));
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
new file mode 100644
index 0000000000..0a539cdec2
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHiveMetadata}. */
+public class IcebergMigrateHiveMetadataFactory implements 
IcebergMigrateMetadataFactory {
+    @Override
+    public String identifier() {
+        return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate";
+    }
+
+    @Override
+    public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier, 
Options icebergOptions) {
+        return new IcebergMigrateHiveMetadata(icebergIdentifier, 
icebergOptions);
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 26f0944d91..608f034659 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,3 +16,4 @@
 org.apache.paimon.hive.HiveCatalogFactory
 org.apache.paimon.hive.HiveCatalogLockFactory
 org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory
+org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 397dfc9421..a79f2002ea 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -562,6 +562,25 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-dropwizard</artifactId>
+            <version>${iceberg.flink.dropwizard.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
new file mode 100644
index 0000000000..1875b08eba
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateIcebergTableAction;
+import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Tests for {@link MigrateIcebergTableProcedure}. */
+public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class);
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9087;
+
+    @TempDir java.nio.file.Path iceTempDir;
+    @TempDir java.nio.file.Path paiTempDir;
+
+    @BeforeEach
+    public void beforeEach() {
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testMigrateIcebergTableProcedure() throws Exception {
+        TableEnvironment tEnv =
+                TableEnvironmentImpl.create(
+                        
EnvironmentSettings.newInstance().inBatchMode().build());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean isPartitioned = random.nextBoolean();
+        boolean icebergIsHive = random.nextBoolean();
+        boolean paimonIsHive = random.nextBoolean();
+        boolean isNamedArgument = random.nextBoolean();
+
+        // Logging the random arguments for debugging
+        LOG.info(
+                "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{}, 
isNamedArgument:{}",
+                isPartitioned,
+                icebergIsHive,
+                paimonIsHive,
+                isNamedArgument);
+
+        // create iceberg catalog, database, table, and insert some data to 
iceberg table
+        tEnv.executeSql(icebergCatalogDdl(icebergIsHive));
+
+        String icebergTable = "iceberg_" + 
UUID.randomUUID().toString().replace("-", "_");
+        tEnv.executeSql("USE CATALOG my_iceberg");
+        if (isPartitioned) {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE `default`.`%s` (id string, id2 int, 
id3 int) PARTITIONED BY (id3)",
+                            icebergTable));
+        } else {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE `default`.`%s` (id string, id2 int, 
id3 int) WITH ('format-version'='2')",
+                            icebergTable));
+        }
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO `default`.`%s` VALUES 
('a',1,1),('b',2,2),('c',3,3)",
+                                icebergTable))
+                .await();
+
+        tEnv.executeSql(paimonCatalogDdl(paimonIsHive));
+        tEnv.executeSql("USE CATALOG my_paimon");
+
+        String icebergOptions =
+                icebergIsHive
+                        ? "metadata.iceberg.storage=hive-catalog, 
metadata.iceberg.uri=thrift://localhost:"
+                                + PORT
+                        : 
"metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir;
+        if (isNamedArgument) {
+            tEnv.executeSql(
+                            String.format(
+                                    "CALL 
sys.migrate_iceberg_table(source_table => 'default.%s', "
+                                            + "iceberg_options => '%s')",
+                                    icebergTable, icebergOptions))
+                    .await();
+        } else {
+            tEnv.executeSql(
+                            String.format(
+                                    "CALL 
sys.migrate_iceberg_table('default.%s','%s')",
+                                    icebergTable, icebergOptions))
+                    .await();
+        }
+
+        Assertions.assertThatList(
+                        Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), 
Row.of("c", 3, 3)))
+                .containsExactlyInAnyOrderElementsOf(
+                        ImmutableList.copyOf(
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "SELECT * FROM 
`default`.`%s`",
+                                                        icebergTable))
+                                        .collect()));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testMigrateIcebergTableAction(boolean isPartitioned) throws 
Exception {
+        TableEnvironment tEnv =
+                TableEnvironmentImpl.create(
+                        
EnvironmentSettings.newInstance().inBatchMode().build());
+
+        // create iceberg catalog, database, table, and insert some data to 
iceberg table
+        tEnv.executeSql(icebergCatalogDdl(true));
+
+        String icebergTable = "iceberg_" + 
UUID.randomUUID().toString().replace("-", "_");
+        tEnv.executeSql("USE CATALOG my_iceberg");
+        if (isPartitioned) {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE `default`.`%s` (id string, id2 int, 
id3 int) PARTITIONED BY (id3)",
+                            icebergTable));
+        } else {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE `default`.`%s` (id string, id2 int, 
id3 int) WITH ('format-version'='2')",
+                            icebergTable));
+        }
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO `default`.`%s` VALUES 
('a',1,1),('b',2,2),('c',3,3)",
+                                icebergTable))
+                .await();
+
+        String icebergOptions =
+                "metadata.iceberg.storage=hive-catalog, 
metadata.iceberg.uri=thrift://localhost:"
+                        + PORT;
+
+        Map<String, String> catalogConf = new HashMap<>();
+        catalogConf.put("warehouse", paiTempDir.toString());
+        catalogConf.put("metastore", "hive");
+        catalogConf.put("uri", "thrift://localhost:" + PORT);
+        catalogConf.put("cache-enabled", "false");
+
+        MigrateIcebergTableAction migrateIcebergTableAction =
+                new MigrateIcebergTableAction(
+                        "default." + icebergTable, catalogConf, 
icebergOptions, "", 6);
+        migrateIcebergTableAction.run();
+
+        tEnv.executeSql(paimonCatalogDdl(true));
+        tEnv.executeSql("USE CATALOG my_paimon");
+        Assertions.assertThatList(
+                        Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), 
Row.of("c", 3, 3)))
+                .containsExactlyInAnyOrderElementsOf(
+                        ImmutableList.copyOf(
+                                tEnv.executeSql(
+                                                String.format(
+                                                        "SELECT * FROM 
`my_paimon`.`default`.`%s`",
+                                                        icebergTable))
+                                        .collect()));
+    }
+
+    private String icebergCatalogDdl(boolean isHive) {
+        return isHive
+                ? String.format(
+                        "CREATE CATALOG my_iceberg WITH "
+                                + "( 'type' = 'iceberg', 'catalog-type' = 
'hive', 'uri' = 'thrift://localhost:%s', "
+                                + "'warehouse' = '%s', 'cache-enabled' = 
'false')",
+                        PORT, iceTempDir)
+                : String.format(
+                        "CREATE CATALOG my_iceberg WITH "
+                                + "( 'type' = 'iceberg', 'catalog-type' = 
'hadoop',"
+                                + "'warehouse' = '%s', 'cache-enabled' = 
'false' )",
+                        iceTempDir);
+    }
+
+    private String paimonCatalogDdl(boolean isHive) {
+        return isHive
+                ? String.format(
+                        "CREATE CATALOG my_paimon WITH "
+                                + "( 'type' = 'paimon', 'metastore' = 'hive', 
'uri' = 'thrift://localhost:%s', "
+                                + "'warehouse' = '%s', 'cache-enabled' = 
'false' )",
+                        PORT, iceTempDir)
+                : String.format(
+                        "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 
'warehouse' = '%s')",
+                        paiTempDir);
+    }
+}
diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml
index c97aceb1b8..92f32d1336 100644
--- a/paimon-hive/pom.xml
+++ b/paimon-hive/pom.xml
@@ -50,6 +50,7 @@ under the License.
         <reflections.version>0.9.8</reflections.version>
         <aws.version>1.12.319</aws.version>
         <iceberg.flink.version>1.19</iceberg.flink.version>
+        
<iceberg.flink.dropwizard.version>1.19.0</iceberg.flink.dropwizard.version>
     </properties>
 
     <dependencies>


Reply via email to