This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a17b2b3 [flink][hive] Introduce procedure to migrate table from hive 
to paimon (#2368)
c7a17b2b3 is described below

commit c7a17b2b37e48a04e49e06b56fd3c58cc969cc66
Author: YeJunHao <[email protected]>
AuthorDate: Tue Nov 28 11:57:28 2023 +0800

    [flink][hive] Introduce procedure to migrate table from hive to paimon 
(#2368)
---
 docs/content/migration/migration-from-hive.md      | 106 ++++++
 .../java/org/apache/paimon/data/BinaryWriter.java  |   8 +-
 .../apache/paimon/format/TableStatsExtractor.java  |  17 +
 .../org/apache/paimon/utils/ParameterUtils.java    |   6 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |   4 +
 .../org/apache/paimon/migrate/FileMetaUtils.java   | 186 +++++++++++
 .../java/org/apache/paimon/migrate/Migrator.java   |  13 +-
 .../paimon/stats/TestTableStatsExtractor.java      |   9 +-
 paimon-flink/paimon-flink-common/pom.xml           |   1 +
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   7 +-
 .../paimon/flink/action/MigrateTableAction.java    |  53 +++
 .../flink/action/MigrateTableActionFactory.java    |  62 ++++
 .../flink/procedure/MigrateFileProcedure.java      |  65 ++++
 .../flink/procedure/MigrateTableProcedure.java     |  72 +++++
 .../paimon/flink/procedure/ProcedureBase.java      |   2 +-
 .../paimon/flink/utils/TableMigrationUtils.java    |  51 +++
 .../services/org.apache.paimon.factories.Factory   |   3 +
 .../format/orc/filter/OrcTableStatsExtractor.java  |  37 ++-
 .../format/parquet/ParquetTableStatsExtractor.java |  29 +-
 .../apache/paimon/format/parquet/ParquetUtil.java  |   8 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  14 +-
 .../apache/paimon/hive/migrate/HiveMigrator.java   | 360 +++++++++++++++++++++
 .../org/apache/paimon/hive/TestHiveMetastore.java  |  50 ++-
 paimon-hive/paimon-hive-connector-common/pom.xml   |  16 +
 .../hive/procedure/MigrateFileProcedureTest.java   | 122 +++++++
 .../hive/procedure/MigrateTableProcedureTest.java  | 216 +++++++++++++
 26 files changed, 1455 insertions(+), 62 deletions(-)

diff --git a/docs/content/migration/migration-from-hive.md 
b/docs/content/migration/migration-from-hive.md
new file mode 100644
index 000000000..831d8102b
--- /dev/null
+++ b/docs/content/migration/migration-from-hive.md
@@ -0,0 +1,106 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to 
Paimon. 
+When migrating data to a paimon table, the origin table will be permanently 
disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unaware-bucket 
append-only table]({{< ref 
"concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate 
File Procedure to totally migrate a table from hive to paimon.
+
+* Migrate Table Procedure: Paimon table does not exist, use the procedure 
upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate File Procedure:  Paimon table already exists, use the procedure to 
migrate files from hive table to paimon table. **Notice that, Hive table will 
also disappear after action done.**
+
+These two actions now only support file format of hive "orc" and "parquet", if 
your table partition formatted by other format like avro, these procedures will 
fail.
+But we will support avro format in the future. Please make sure your table 
partition format is in "orc" and "parquet" now.
+
+<span style="color: red; "> **We highly recommend to back up hive table data 
before migrating, because migrating action is not atomic. If been interrupted 
while migrating, you may lose your data.** </span>
+
+## Example for Migration
+
+**Migrate Hive Table**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_table</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;.&lt;hive_tablename&gt;&#39;, 
&#39;&lt;paimon_tableconf&gt;&#39;);***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_table('hive', 'default.hivetable', 'file.format=orc');
+```
+After invoke, "hivetable" will totally convert to paimon format. Writing and 
reading the table by old "hive way" will fail.
+We can add our table properties while importing by 
sys.migrate_table('<database>.<tablename>', '<tableproperties>').
+<tableproperties> here should be separated by ",".  For example:
+
+```sql
+CALL sys.migrate_table('hive', 'my_db.wait_to_upgrate', 
'file.format=orc,read.batch-size=2096,write-only=true')
+```
+
+If your flink version is below 1.17, you can use flink action to achieve this:
+```bash
+<FLINK_HOME>/bin/flink run \
+/path/to/paimon-flink-action-{{< version >}}.jar \
+migrate_table
+--warehouse <warehouse-path> \
+--source-table-type hive \
+--source-table-id <database.table-name> \
+[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> 
...]] \
+[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> 
...]]
+```
+
+Example:
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-0.7-SNAPSHOT.jar migrate_table \
+--warehouse /path/to/warehouse \
+--catalog-conf uri=thrift://localhost:9083 \
+--catalog-conf metastore=hive \
+--source-table-type hive \
+--source-table-id default.hive_or_paimon \
+```
+
+**Migrate Hive File**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_file</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;.&lt;hive_table_name&gt;&#39;, 
&#39;&lt;paimon_database&gt;.&lt;paimon_tablename&gt;&#39;);***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable');
+```
+After invoke, "hivetable" will disappear. And all files will be moved and 
renamed to paimon directory. "paimontable" here must have the same
+partition keys with "hivetable", and "paimontable" should be in unaware-bucket 
mode.
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
index 0da3bced2..1c037425c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
@@ -144,9 +144,9 @@ public interface BinaryWriter {
     }
 
     /**
-     * Creates an accessor for setting the elements of an array writer during 
runtime.
+     * Creates an accessor for setting the elements of a binary writer during 
runtime.
      *
-     * @param elementType the element type of the array
+     * @param elementType the element type
      */
     static ValueSetter createValueSetter(DataType elementType) {
         // ordered by type root definition
@@ -208,8 +208,8 @@ public interface BinaryWriter {
         }
     }
 
-    /** Accessor for setting the elements of an array writer during runtime. */
+    /** Accessor for setting the elements of a binary writer during runtime. */
     interface ValueSetter extends Serializable {
-        void setValue(BinaryArrayWriter writer, int pos, Object value);
+        void setValue(BinaryWriter writer, int pos, Object value);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
index e5e8e3eb0..4fae3d5e5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
 
 import java.io.IOException;
 
@@ -27,4 +28,20 @@ import java.io.IOException;
 public interface TableStatsExtractor {
 
     FieldStats[] extract(FileIO fileIO, Path path) throws IOException;
+
+    Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path) 
throws IOException;
+
+    /** File info fetched from physical file. */
+    class FileInfo {
+
+        private long rowCount;
+
+        public FileInfo(long rowCount) {
+            this.rowCount = rowCount;
+        }
+
+        public long getRowCount() {
+            return rowCount;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
index 693ba494e..5d12cf27f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
@@ -36,8 +36,10 @@ public class ParameterUtils {
 
     public static Map<String, String> parseCommaSeparatedKeyValues(String 
keyValues) {
         Map<String, String> kvs = new HashMap<>();
-        for (String kvString : keyValues.split(",")) {
-            parseKeyValueString(kvs, kvString);
+        if (!StringUtils.isBlank(keyValues)) {
+            for (String kvString : keyValues.split(",")) {
+                parseKeyValueString(kvs, kvString);
+            }
         }
         return kvs;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a54ea924d..c01baa774 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -350,6 +350,10 @@ public abstract class AbstractCatalog implements Catalog {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
+    public FileIO fileIO() {
+        return fileIO;
+    }
+
     private String[] tableAndSystemName(Identifier identifier) {
         String[] splits = StringUtils.split(identifier.getObjectName(), 
SYSTEM_TABLE_SPLITTER);
         if (splits.length != 2) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
new file mode 100644
index 000000000..c39a68001
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.migrate;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.TableStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.NewFilesIncrement;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.stats.BinaryTableStats;
+import org.apache.paimon.stats.FieldStatsArraySerializer;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StatsCollectorFactories;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** To construct file meta data for external files. */
+public class FileMetaUtils {
+
+    public static List<DataFileMeta> construct(
+            FileIO fileIO,
+            String format,
+            String location,
+            Table paimonTable,
+            Predicate<FileStatus> filter,
+            Path dir,
+            Map<Path, Path> rollback)
+            throws IOException {
+        List<FileStatus> fileStatuses =
+                Arrays.stream(fileIO.listStatus(new Path(location)))
+                        .filter(s -> !s.isDir())
+                        .filter(filter)
+                        .collect(Collectors.toList());
+
+        return fileStatuses.stream()
+                .map(
+                        status ->
+                                constructFileMeta(
+                                        format, status, fileIO, paimonTable, 
dir, rollback))
+                .collect(Collectors.toList());
+    }
+
+    public static CommitMessage commitFile(BinaryRow partition, 
List<DataFileMeta> dataFileMetas) {
+        return new CommitMessageImpl(
+                partition,
+                0,
+                new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
+                new CompactIncrement(
+                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
+    }
+
+    // -----------------------------private 
method---------------------------------------------
+
+    private static DataFileMeta constructFileMeta(
+            String format,
+            FileStatus fileStatus,
+            FileIO fileIO,
+            Table table,
+            Path dir,
+            Map<Path, Path> rollback) {
+
+        try {
+            FieldStatsCollector.Factory[] factories =
+                    StatsCollectorFactories.createStatsFactories(
+                            ((AbstractFileStoreTable) table).coreOptions(),
+                            table.rowType().getFieldNames());
+
+            TableStatsExtractor tableStatsExtractor =
+                    FileFormat.getFileFormat(
+                                    ((AbstractFileStoreTable) table)
+                                            .coreOptions()
+                                            .toConfiguration(),
+                                    format)
+                            .createStatsExtractor(table.rowType(), factories)
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Can't get table stats 
extractor for format "
+                                                            + format));
+            Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, 
format, rollback);
+            return constructFileMeta(
+                    newPath.getName(),
+                    fileStatus.getLen(),
+                    newPath,
+                    tableStatsExtractor,
+                    fileIO,
+                    table);
+        } catch (IOException e) {
+            throw new RuntimeException("error when construct file meta", e);
+        }
+    }
+
+    private static Path renameFile(
+            FileIO fileIO, Path originPath, Path newDir, String format, 
Map<Path, Path> rollback)
+            throws IOException {
+        String subfix = "." + format;
+        String fileName = originPath.getName();
+        String newFileName = fileName.endsWith(subfix) ? fileName : fileName + 
"." + format;
+        Path newPath = new Path(newDir, newFileName);
+        rollback.put(newPath, originPath);
+        fileIO.rename(originPath, newPath);
+        return newPath;
+    }
+
+    private static DataFileMeta constructFileMeta(
+            String fileName,
+            long fileSize,
+            Path path,
+            TableStatsExtractor tableStatsExtractor,
+            FileIO fileIO,
+            Table table)
+            throws IOException {
+        FieldStatsArraySerializer statsArraySerializer =
+                new FieldStatsArraySerializer(table.rowType());
+
+        Pair<FieldStats[], TableStatsExtractor.FileInfo> fileInfo =
+                tableStatsExtractor.extractWithFileInfo(fileIO, path);
+        BinaryTableStats stats = 
statsArraySerializer.toBinary(fileInfo.getLeft());
+
+        return DataFileMeta.forAppend(
+                fileName,
+                fileSize,
+                fileInfo.getRight().getRowCount(),
+                stats,
+                0,
+                0,
+                ((AbstractFileStoreTable) table).schema().id());
+    }
+
+    public static BinaryRow writePartitionValue(
+            RowType partitionRowType,
+            Map<String, String> partitionValues,
+            List<BinaryWriter.ValueSetter> valueSetters) {
+
+        BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        List<DataField> fields = partitionRowType.getFields();
+
+        for (int i = 0; i < fields.size(); i++) {
+            Object value =
+                    TypeUtils.castFromString(
+                            partitionValues.get(fields.get(i).name()), 
fields.get(i).type());
+            valueSetters.get(i).setValue(binaryRowWriter, i, value);
+        }
+        binaryRowWriter.complete();
+        return binaryRow;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
similarity index 73%
copy from 
paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
copy to paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
index e5e8e3eb0..7420528e4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
@@ -16,15 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format;
+package org.apache.paimon.migrate;
 
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
+/** Migrator interface for migrating table from other data-lake like hive, 
iceberg, hudi and etc. */
+public interface Migrator {
 
-import java.io.IOException;
-
-/** Extracts statistics directly from file. */
-public interface TableStatsExtractor {
-
-    FieldStats[] extract(FileIO fileIO, Path path) throws IOException;
+    void executeMigrate() throws Exception;
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 530fc9b5b..7c9572653 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -30,6 +30,7 @@ import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
@@ -57,6 +58,12 @@ public class TestTableStatsExtractor implements 
TableStatsExtractor {
 
     @Override
     public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
+        return extractWithFileInfo(fileIO, path).getLeft();
+    }
+
+    @Override
+    public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, 
Path path)
+            throws IOException {
         IdentityObjectSerializer serializer = new 
IdentityObjectSerializer(rowType);
         FormatReaderFactory readerFactory = 
format.createReaderFactory(rowType);
         List<InternalRow> records =
@@ -66,7 +73,7 @@ public class TestTableStatsExtractor implements 
TableStatsExtractor {
         for (InternalRow record : records) {
             statsCollector.collect(record);
         }
-        return statsCollector.extract();
+        return Pair.of(statsCollector.extract(), new FileInfo(records.size()));
     }
 
     private static class IdentityObjectSerializer extends 
ObjectSerializer<InternalRow> {
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index e76526445..c5a7d32a7 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,6 +35,7 @@ under the License.
 
     <properties>
         <flink.version>1.18.0</flink.version>
+        <hive.version>2.3.9</hive.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 84c628f9e..9a180c678 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -349,12 +349,9 @@ public class FlinkCatalog extends AbstractCatalog {
         }
 
         // remove table path
-        String specific = options.remove(PATH.key());
-        if (specific != null || logStoreAutoRegister) {
-            catalogTable = catalogTable.copy(options);
-        }
+        options.remove(PATH.key());
 
-        return fromCatalogTable(catalogTable);
+        return fromCatalogTable(catalogTable.copy(options));
     }
 
     private List<SchemaChange> toSchemaChange(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
new file mode 100644
index 000000000..ae7ff0216
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateTableProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external hive table to paimon table. */
+public class MigrateTableAction extends ActionBase {
+
+    private final String connector;
+    private final String hiveTableFullName;
+    private final String tableProperties;
+
+    public MigrateTableAction(
+            String connector,
+            String warehouse,
+            String hiveTableFullName,
+            Map<String, String> catalogConfig,
+            String tableProperties) {
+        super(warehouse, catalogConfig);
+        this.connector = connector;
+        this.hiveTableFullName = hiveTableFullName;
+        this.tableProperties = tableProperties;
+    }
+
+    @Override
+    public void run() throws Exception {
+        MigrateTableProcedure migrateTableProcedure = new 
MigrateTableProcedure();
+        migrateTableProcedure.withCatalog(catalog);
+        migrateTableProcedure.call(
+                new DefaultProcedureContext(env), connector, 
hiveTableFullName, tableProperties);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
new file mode 100644
index 000000000..9efbe6cf4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateTableAction}. */
+public class MigrateTableActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "migrate_table";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        String warehouse = params.get("warehouse");
+        String connector = params.get("source-table-type");
+        String sourceHiveTable = params.get("source-table-id");
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+        String tableConf = params.get("table-properties");
+
+        MigrateTableAction migrateTableAction =
+                new MigrateTableAction(
+                        connector, warehouse, sourceHiveTable, catalogConfig, 
tableConf);
+        return Optional.of(migrateTableAction);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"migrate_table\" runs a migrating job from 
hive to paimon.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  compact --warehouse <warehouse-path> --source-table-type 
hive "
+                        + "--source-table-id <database.table_name> "
+                        + "[--catalog-conf <key>=<value] "
+                        + "[--table-properties 
<key>=<value>,<key>=<value>,...]");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
new file mode 100644
index 000000000..c3e197291
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Collections;
+
+/** Add file procedure to add file from hive to paimon. */
+public class MigrateFileProcedure extends ProcedureBase {
+
+    @Override
+    public String identifier() {
+        return "migrate_file";
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String connector,
+            String sourceTablePath,
+            String targetPaimonTablePath)
+            throws Exception {
+        if (!(catalog instanceof HiveCatalog)) {
+            throw new IllegalArgumentException("Only support Hive Catalog");
+        }
+        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+
+        if (!(catalog.tableExists(targetTableId))) {
+            throw new IllegalArgumentException(
+                    "Target paimon table does not exist: " + 
targetPaimonTablePath);
+        }
+
+        TableMigrationUtils.getImporter(
+                        connector,
+                        (HiveCatalog) catalog,
+                        sourceTableId.getDatabaseName(),
+                        sourceTableId.getObjectName(),
+                        targetTableId.getDatabaseName(),
+                        targetTableId.getObjectName(),
+                        Collections.emptyMap())
+                .executeMigrate();
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
new file mode 100644
index 000000000..31b86dbb6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class MigrateTableProcedure extends ProcedureBase {
+
+    private static final String PAIMON_SUFFIX = "_paimon_";
+
+    @Override
+    public String identifier() {
+        return "migrate_table";
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String connector, String 
sourceTablePath)
+            throws Exception {
+        return call(procedureContext, connector, sourceTablePath, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String connector,
+            String sourceTablePath,
+            String properties)
+            throws Exception {
+        if (!(catalog instanceof HiveCatalog)) {
+            throw new IllegalArgumentException("Only support Hive Catalog");
+        }
+
+        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+
+        TableMigrationUtils.getImporter(
+                        connector,
+                        (HiveCatalog) catalog,
+                        sourceTableId.getDatabaseName(),
+                        sourceTableId.getObjectName(),
+                        targetTableId.getDatabaseName(),
+                        targetTableId.getObjectName(),
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
+                .executeMigrate();
+
+        catalog.renameTable(targetTableId, sourceTableId, false);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 17490cb4a..5f71d53c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -41,7 +41,7 @@ public abstract class ProcedureBase implements Procedure, 
Factory {
 
     protected Catalog catalog;
 
-    ProcedureBase withCatalog(Catalog catalog) {
+    public ProcedureBase withCatalog(Catalog catalog) {
         this.catalog = catalog;
         return this;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
new file mode 100644
index 000000000..32f263ab3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.migrate.HiveMigrator;
+import org.apache.paimon.migrate.Migrator;
+
+import java.util.Map;
+
+/** Migration util to choose importer according to connector. */
+public class TableMigrationUtils {
+
+    public static Migrator getImporter(
+            String connector,
+            HiveCatalog paimonCatalog,
+            String sourceDatabase,
+            String souceTableName,
+            String targetDatabase,
+            String targetTableName,
+            Map<String, String> options) {
+        switch (connector) {
+            case "hive":
+                return new HiveMigrator(
+                        paimonCatalog,
+                        sourceDatabase,
+                        souceTableName,
+                        targetDatabase,
+                        targetTableName,
+                        options);
+            default:
+                throw new UnsupportedOperationException("Don't support 
connector " + connector);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1196880bf..bac5cdd83 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -23,6 +23,7 @@ org.apache.paimon.flink.action.RollbackToActionFactory
 org.apache.paimon.flink.action.CreateTagActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
+org.apache.paimon.flink.action.MigrateTableActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -33,3 +34,5 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
 org.apache.paimon.flink.procedure.MergeIntoProcedure
 org.apache.paimon.flink.procedure.ResetConsumerProcedure
 org.apache.paimon.flink.procedure.RollbackToProcedure
+org.apache.paimon.flink.procedure.MigrateTableProcedure
+org.apache.paimon.flink.procedure.MigrateFileProcedure
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index 588e04f92..a4e7acac9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,12 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
 
     @Override
     public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
+        return extractWithFileInfo(fileIO, path).getLeft();
+    }
+
+    @Override
+    public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, 
Path path)
+            throws IOException {
         try (Reader reader = OrcReaderFactory.createReader(new 
Configuration(), fileIO, path)) {
             long rowCount = reader.getNumberOfRows();
             ColumnStatistics[] columnStatistics = reader.getStatistics();
@@ -76,16 +83,26 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
 
             FieldStatsCollector[] collectors = 
FieldStatsCollector.create(statsCollectors);
 
-            return IntStream.range(0, rowType.getFieldCount())
-                    .mapToObj(
-                            i -> {
-                                DataField field = rowType.getFields().get(i);
-                                int fieldIdx = 
columnNames.indexOf(field.name());
-                                int colId = columnTypes.get(fieldIdx).getId();
-                                return toFieldStats(
-                                        field, columnStatistics[colId], 
rowCount, collectors[i]);
-                            })
-                    .toArray(FieldStats[]::new);
+            return Pair.of(
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(
+                                    i -> {
+                                        DataField field = 
rowType.getFields().get(i);
+                                        int fieldIdx = 
columnNames.indexOf(field.name());
+                                        if (fieldIdx == -1) {
+                                            return collectors[i].convert(
+                                                    new FieldStats(null, null, 
null));
+                                        } else {
+                                            int colId = 
columnTypes.get(fieldIdx).getId();
+                                            return toFieldStats(
+                                                    field,
+                                                    columnStatistics[colId],
+                                                    rowCount,
+                                                    collectors[i]);
+                                        }
+                                    })
+                            .toArray(FieldStats[]::new),
+                    new FileInfo(rowCount));
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index e8836b8fe..9055f64b0 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.parquet.column.statistics.BinaryStatistics;
@@ -67,15 +68,27 @@ public class ParquetTableStatsExtractor implements 
TableStatsExtractor {
 
     @Override
     public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
-        Map<String, Statistics<?>> stats = 
ParquetUtil.extractColumnStats(fileIO, path);
+        return extractWithFileInfo(fileIO, path).getLeft();
+    }
+
+    @Override
+    public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, 
Path path)
+            throws IOException {
+        Pair<Map<String, Statistics<?>>, FileInfo> statsPair =
+                ParquetUtil.extractColumnStats(fileIO, path);
         FieldStatsCollector[] collectors = 
FieldStatsCollector.create(statsCollectors);
-        return IntStream.range(0, rowType.getFieldCount())
-                .mapToObj(
-                        i -> {
-                            DataField field = rowType.getFields().get(i);
-                            return toFieldStats(field, 
stats.get(field.name()), collectors[i]);
-                        })
-                .toArray(FieldStats[]::new);
+        return Pair.of(
+                IntStream.range(0, rowType.getFieldCount())
+                        .mapToObj(
+                                i -> {
+                                    DataField field = 
rowType.getFields().get(i);
+                                    return toFieldStats(
+                                            field,
+                                            
statsPair.getLeft().get(field.name()),
+                                            collectors[i]);
+                                })
+                        .toArray(FieldStats[]::new),
+                statsPair.getRight());
     }
 
     private FieldStats toFieldStats(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
index b7aa5d956..05650da94 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.format.parquet;
 
+import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Pair;
 
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
@@ -44,8 +46,8 @@ public class ParquetUtil {
      * @return result sets as map, key is column name, value is statistics 
(for example, null count,
      *     minimum value, maximum value)
      */
-    public static Map<String, Statistics<?>> extractColumnStats(FileIO fileIO, 
Path path)
-            throws IOException {
+    public static Pair<Map<String, Statistics<?>>, 
TableStatsExtractor.FileInfo> extractColumnStats(
+            FileIO fileIO, Path path) throws IOException {
         try (ParquetFileReader reader = getParquetReader(fileIO, path)) {
             ParquetMetadata parquetMetadata = reader.getFooter();
             List<BlockMetaData> blockMetaDataList = 
parquetMetadata.getBlocks();
@@ -65,7 +67,7 @@ public class ParquetUtil {
                     resultStats.put(columnName, midStats);
                 }
             }
-            return resultStats;
+            return Pair.of(resultStats, new 
TableStatsExtractor.FileInfo(reader.getRecordCount()));
         }
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 987ec2c7c..60aef7942 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -58,6 +58,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -515,7 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @VisibleForTesting
-    IMetaStoreClient getHmsClient() {
+    public IMetaStoreClient getHmsClient() {
         return client;
     }
 
@@ -585,7 +586,6 @@ public class HiveCatalog extends AbstractCatalog {
             HiveConf.setLoadMetastoreConfig(false);
             HiveConf.setLoadHiveServer2Config(false);
             HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
-
             org.apache.hadoop.fs.Path hiveSite =
                     new org.apache.hadoop.fs.Path(hiveConfDir, HIVE_SITE_FILE);
             if (!hiveSite.toUri().isAbsolute()) {
@@ -603,7 +603,15 @@ public class HiveCatalog extends AbstractCatalog {
 
             return hiveConf;
         } else {
-            return new HiveConf(hadoopConf, HiveConf.class);
+            HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+            // user doesn't provide hive conf dir, we try to find it in 
classpath
+            URL hiveSite =
+                    
Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);
+            if (hiveSite != null) {
+                LOG.info("Found {} in classpath: {}", HIVE_SITE_FILE, 
hiveSite);
+                hiveConf.addResource(hiveSite);
+            }
+            return hiveConf;
         }
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
new file mode 100644
index 000000000..3a3b8f5ae
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+            p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
+
+    private final FileIO fileIO;
+    private final HiveCatalog hiveCatalog;
+    private final IMetaStoreClient client;
+    private final String sourceDatabase;
+    private final String sourceTable;
+    private final String targetDatabase;
+    private final String targetTable;
+    private final Map<String, String> options;
+
+    public HiveMigrator(
+            HiveCatalog hiveCatalog,
+            String sourceDatabase,
+            String sourceTable,
+            String targetDatabase,
+            String targetTable,
+            Map<String, String> options) {
+        this.hiveCatalog = hiveCatalog;
+        this.fileIO = hiveCatalog.fileIO();
+        this.client = hiveCatalog.getHmsClient();
+        this.sourceDatabase = sourceDatabase;
+        this.sourceTable = sourceTable;
+        this.targetDatabase = targetDatabase;
+        this.targetTable = targetTable;
+        this.options = options;
+    }
+
+    public void executeMigrate() throws Exception {
+        if (!client.tableExists(sourceDatabase, sourceTable)) {
+            throw new RuntimeException("Source hive table does not exist");
+        }
+
+        Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+        Map<String, String> properties = new 
HashMap<>(sourceHiveTable.getParameters());
+        checkPrimaryKey();
+
+        AbstractFileStoreTable paimonTable =
+                createPaimonTableIfNotExists(
+                        client.getSchema(sourceDatabase, sourceTable),
+                        sourceHiveTable.getPartitionKeys(),
+                        properties);
+        checkPaimonTable(paimonTable);
+
+        List<String> partitionsNames =
+                client.listPartitionNames(sourceDatabase, sourceTable, 
Short.MAX_VALUE);
+        checkCompatible(sourceHiveTable, paimonTable);
+
+        List<MigrateTask> tasks = new ArrayList<>();
+        Map<Path, Path> rollBack = new ConcurrentHashMap<>();
+        if (partitionsNames.isEmpty()) {
+            tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, 
paimonTable, rollBack));
+        } else {
+            tasks.addAll(
+                    importPartitionedTableTask(
+                            client,
+                            fileIO,
+                            partitionsNames,
+                            sourceHiveTable,
+                            paimonTable,
+                            rollBack));
+        }
+
+        Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
+        List<Future<?>> futures = new ArrayList<>();
+        tasks.forEach(
+                task ->
+                        futures.add(
+                                COMMON_IO_FORK_JOIN_POOL.submit(
+                                        () -> 
commitMessages.add(task.get()))));
+
+        try {
+            for (Future<?> future : futures) {
+                future.get();
+            }
+        } catch (Exception e) {
+            futures.forEach(f -> f.cancel(true));
+            for (Future<?> future : futures) {
+                // wait all task cancelled or finished
+                while (!future.isDone()) {
+                    Thread.sleep(100);
+                }
+            }
+            // roll back all renamed path
+            for (Map.Entry<Path, Path> entry : rollBack.entrySet()) {
+                Path newPath = entry.getKey();
+                Path origin = entry.getValue();
+                if (fileIO.exists(newPath)) {
+                    fileIO.rename(newPath, origin);
+                }
+            }
+
+            throw new RuntimeException("Migrating failed because exception 
happens", e);
+        }
+
+        paimonTable.newBatchWriteBuilder().newCommit().commit(new 
ArrayList<>(commitMessages));
+        client.dropTable(sourceDatabase, sourceTable, true, true);
+    }
+
+    private void checkPrimaryKey() throws Exception {
+        PrimaryKeysRequest primaryKeysRequest = new 
PrimaryKeysRequest(sourceDatabase, sourceTable);
+        if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {
+            throw new IllegalArgumentException("Can't migrate primary key 
table yet.");
+        }
+    }
+
+    private void checkPaimonTable(AbstractFileStoreTable paimonTable) {
+        if (!(paimonTable instanceof AppendOnlyFileStoreTable)) {
+            throw new IllegalArgumentException(
+                    "Hive migrator only support append only table target 
table");
+        }
+
+        if (paimonTable.store().bucketMode() != BucketMode.UNAWARE) {
+            throw new IllegalArgumentException(
+                    "Hive migrator only support unaware-bucket target table");
+        }
+    }
+
+    private AbstractFileStoreTable createPaimonTableIfNotExists(
+            List<FieldSchema> fields,
+            List<FieldSchema> partitionFields,
+            Map<String, String> hiveTableOptions)
+            throws Exception {
+
+        Identifier identifier = Identifier.create(targetDatabase, targetTable);
+        if (!hiveCatalog.tableExists(identifier)) {
+            Schema schema = from(fields, partitionFields, hiveTableOptions);
+            hiveCatalog.createTable(identifier, schema, false);
+        }
+        return (AbstractFileStoreTable) hiveCatalog.getTable(identifier);
+    }
+
+    public Schema from(
+            List<FieldSchema> fields,
+            List<FieldSchema> partitionFields,
+            Map<String, String> hiveTableOptions) {
+        HashMap<String, String> paimonOptions = new HashMap<>(this.options);
+        paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .comment(hiveTableOptions.get("comment"))
+                        .options(paimonOptions)
+                        .partitionKeys(
+                                partitionFields.stream()
+                                        .map(FieldSchema::getName)
+                                        .collect(Collectors.toList()));
+
+        fields.forEach(
+                field ->
+                        schemaBuilder.column(
+                                field.getName(),
+                                toPaimonType(field.getType()),
+                                field.getComment()));
+
+        return schemaBuilder.build();
+    }
+
+    private List<MigrateTask> importPartitionedTableTask(
+            IMetaStoreClient client,
+            FileIO fileIO,
+            List<String> partitionNames,
+            Table sourceTable,
+            AbstractFileStoreTable paimonTable,
+            Map<Path, Path> rollback)
+            throws Exception {
+        List<MigrateTask> migrateTasks = new ArrayList<>();
+        List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+
+        RowType partitionRowType =
+                
paimonTable.schema().projectedLogicalRowType(paimonTable.schema().partitionKeys());
+
+        partitionRowType
+                .getFieldTypes()
+                .forEach(type -> 
valueSetters.add(BinaryWriter.createValueSetter(type)));
+
+        for (String partitionName : partitionNames) {
+            Partition partition =
+                    client.getPartition(
+                            sourceTable.getDbName(), 
sourceTable.getTableName(), partitionName);
+            Map<String, String> values = 
client.partitionNameToSpec(partitionName);
+            String format = 
parseFormat(partition.getSd().getSerdeInfo().toString());
+            String location = partition.getSd().getLocation();
+            BinaryRow partitionRow =
+                    FileMetaUtils.writePartitionValue(partitionRowType, 
values, valueSetters);
+            Path path = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+
+            migrateTasks.add(
+                    new MigrateTask(
+                            fileIO, format, location, paimonTable, 
partitionRow, path, rollback));
+        }
+        return migrateTasks;
+    }
+
+    public MigrateTask importUnPartitionedTableTask(
+            FileIO fileIO,
+            Table sourceTable,
+            AbstractFileStoreTable paimonTable,
+            Map<Path, Path> rollback) {
+        String format = 
parseFormat(sourceTable.getSd().getSerdeInfo().toString());
+        String location = sourceTable.getSd().getLocation();
+        Path path = 
paimonTable.store().pathFactory().bucketPath(BinaryRow.EMPTY_ROW, 0);
+        return new MigrateTask(
+                fileIO, format, location, paimonTable, BinaryRow.EMPTY_ROW, 
path, rollback);
+    }
+
+    private void checkCompatible(Table sourceHiveTable, AbstractFileStoreTable 
paimonTable) {
+        List<FieldSchema> sourceFields = new 
ArrayList<>(sourceHiveTable.getPartitionKeys());
+        List<DataField> targetFields =
+                new ArrayList<>(
+                        paimonTable
+                                .schema()
+                                
.projectedLogicalRowType(paimonTable.partitionKeys())
+                                .getFields());
+
+        if (sourceFields.size() != targetFields.size()) {
+            throw new RuntimeException(
+                    "Source table partition keys not match target table 
partition keys.");
+        }
+
+        sourceFields.sort(Comparator.comparing(FieldSchema::getName));
+        targetFields.sort(Comparator.comparing(DataField::name));
+
+        for (int i = 0; i < sourceFields.size(); i++) {
+            FieldSchema s = sourceFields.get(i);
+            DataField t = targetFields.get(i);
+
+            if (!s.getName().equals(t.name())
+                    || !s.getType().equalsIgnoreCase(t.type().asSQLString())) {
+                throw new RuntimeException(
+                        "Source table partition keys not match target table 
partition keys, please checkCompatible.");
+            }
+        }
+    }
+
+    private String parseFormat(String serder) {
+        if (serder.contains("avro")) {
+            throw new UnsupportedOperationException("Can't support format avro 
yet.");
+        } else if (serder.contains("parquet")) {
+            return "parquet";
+        } else if (serder.contains("orc")) {
+            return "orc";
+        } else {
+            throw new UnsupportedOperationException("Unknown partition format: 
" + serder);
+        }
+    }
+
+    /** One import task for one partition. */
+    public static class MigrateTask implements Supplier<CommitMessage> {
+
+        private final FileIO fileIO;
+        private final String format;
+        private final String location;
+        private final AbstractFileStoreTable paimonTable;
+        private final BinaryRow partitionRow;
+        private final Path newDir;
+        private final Map<Path, Path> rollback;
+
+        public MigrateTask(
+                FileIO fileIO,
+                String format,
+                String location,
+                AbstractFileStoreTable paimonTable,
+                BinaryRow partitionRow,
+                Path newDir,
+                Map<Path, Path> rollback) {
+            this.fileIO = fileIO;
+            this.format = format;
+            this.location = location;
+            this.paimonTable = paimonTable;
+            this.partitionRow = partitionRow;
+            this.newDir = newDir;
+            this.rollback = rollback;
+        }
+
+        @Override
+        public CommitMessage get() {
+            try {
+                List<DataFileMeta> fileMetas =
+                        FileMetaUtils.construct(
+                                fileIO,
+                                format,
+                                location,
+                                paimonTable,
+                                HIDDEN_PATH_FILTER,
+                                newDir,
+                                rollback);
+                return FileMetaUtils.commitFile(partitionRow, fileMetas);
+            } catch (IOException e) {
+                throw new RuntimeException("Can't get commit message", e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
 
b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
index 16520281c..5b6ea9933 100644
--- 
a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
+++ 
b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
@@ -62,27 +62,31 @@ import static 
java.nio.file.attribute.PosixFilePermissions.fromString;
  */
 public class TestHiveMetastore {
 
-    private static final int DEFAULT_POOL_SIZE = 5;
+    private static final int DEFAULT_POOL_SIZE = 15;
 
     // It's tricky to clear all static fields in an HMS instance in order to 
switch derby root dir.
     // Therefore, we reuse the same derby root between tests and remove it 
after JVM exits.
-    private static final File HIVE_LOCAL_DIR;
-    private static final String DERBY_PATH;
+    private static File hiveLocalDir;
+    private static String derbyPath;
 
     static {
+        setup();
+    }
+
+    private static void setup() {
         try {
-            HIVE_LOCAL_DIR =
+            hiveLocalDir =
                     createTempDirectory("hive", 
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
-            DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
-            File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
+            derbyPath = new File(hiveLocalDir, "metastore_db").getPath();
+            File derbyLogFile = new File(hiveLocalDir, "derby.log");
             System.setProperty("derby.stream.error.file", 
derbyLogFile.getAbsolutePath());
-            setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
+            setupMetastoreDB("jdbc:derby:" + derbyPath + ";create=true");
             Runtime.getRuntime()
                     .addShutdownHook(
                             new Thread(
                                     () -> {
                                         Path localDirPath =
-                                                new 
Path(HIVE_LOCAL_DIR.getAbsolutePath());
+                                                new 
Path(hiveLocalDir.getAbsolutePath());
                                         FileSystem fs;
                                         try {
                                             fs =
@@ -110,12 +114,20 @@ public class TestHiveMetastore {
     private TServer server;
     private HiveMetaStore.HMSHandler baseHandler;
 
+    /**
+     * Starts a TestHiveMetastore with the default connection pool size (5) 
and the default
+     * HiveConf.
+     */
+    public void start(int port) {
+        start(new HiveConf(new Configuration(), TestHiveMetastore.class), 
DEFAULT_POOL_SIZE, port);
+    }
+
     /**
      * Starts a TestHiveMetastore with the default connection pool size (5) 
and the default
      * HiveConf.
      */
     public void start() {
-        start(new HiveConf(new Configuration(), TestHiveMetastore.class), 
DEFAULT_POOL_SIZE);
+        start(new HiveConf(new Configuration(), TestHiveMetastore.class), 
DEFAULT_POOL_SIZE, 9083);
     }
 
     /**
@@ -124,9 +136,9 @@ public class TestHiveMetastore {
      * @param conf The hive configuration to use
      * @param poolSize The number of threads in the executor pool
      */
-    public void start(HiveConf conf, int poolSize) {
+    public void start(HiveConf conf, int poolSize, int portNum) {
         try {
-            TServerSocket socket = new TServerSocket(9083);
+            TServerSocket socket = new TServerSocket(portNum);
             int port = socket.getServerSocket().getLocalPort();
             initConf(conf, port);
 
@@ -141,6 +153,7 @@ public class TestHiveMetastore {
             System.setProperty(
                     HiveConf.ConfVars.METASTOREURIS.varname,
                     hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+            System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
warehouseDir());
         } catch (Exception e) {
             throw new RuntimeException("Cannot start TestHiveMetastore", e);
         }
@@ -157,10 +170,13 @@ public class TestHiveMetastore {
         if (baseHandler != null) {
             baseHandler.shutdown();
         }
+        System.clearProperty(HiveConf.ConfVars.METASTOREURIS.varname);
+        System.clearProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
     }
 
     public void reset() throws Exception {
-        Path warehouseRoot = new Path(HIVE_LOCAL_DIR.getAbsolutePath());
+        setup();
+        Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath());
         FileSystem fs = FileSystem.get(warehouseRoot.toUri(), hiveConf);
         for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
             if (!fileStatus.getPath().getName().equals("derby.log")
@@ -170,12 +186,16 @@ public class TestHiveMetastore {
         }
     }
 
+    private static String warehouseDir() {
+        return "file:" + hiveLocalDir.getAbsolutePath();
+    }
+
     private TServer newThriftServer(TServerSocket socket, int poolSize, 
HiveConf conf)
             throws Exception {
         HiveConf serverConf = new HiveConf(conf);
         serverConf.set(
                 HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
-                "jdbc:derby:" + DERBY_PATH + ";create=true");
+                "jdbc:derby:" + derbyPath + ";create=true");
         baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", 
serverConf);
         IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, 
baseHandler, false);
 
@@ -192,9 +212,7 @@ public class TestHiveMetastore {
 
     private void initConf(HiveConf conf, int port) {
         conf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
-        conf.set(
-                HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
-                "file:" + HIVE_LOCAL_DIR.getAbsolutePath());
+        conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir());
         conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
         conf.set(
                 
HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index b6a84361d..0f1e86cdc 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -115,6 +115,22 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-hive-catalog</artifactId>
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
new file mode 100644
index 000000000..7fa93a9b7
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Random;
+
+/** Tests for {@link MigrateFileProcedure}. */
+public class MigrateFileProcedureTest extends ActionITCaseBase {
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9085;
+
+    @BeforeEach
+    public void beforeEach() {
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testOrc() throws Exception {
+        test("orc");
+    }
+
+    @Test
+    public void testParquet() throws Exception {
+        test("parquet");
+    }
+
+    public void test(String format) throws Exception {
+        StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+        TableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql(
+                "CREATE TABLE paimontable (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+        tEnv.executeSql("CALL sys.migrate_file('hive', 'default.hivetable', 
'default.paimontable')")
+                .await();
+        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
paimontable").collect());
+
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    private String data(int i) {
+        Random random = new Random();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int m = 0; m < i; m++) {
+            stringBuilder.append("(");
+            stringBuilder.append("\"");
+            stringBuilder.append('a' + m);
+            stringBuilder.append("\",");
+            stringBuilder.append(random.nextInt(10));
+            stringBuilder.append(",");
+            stringBuilder.append(random.nextInt(10));
+            stringBuilder.append(")");
+            if (m != i - 1) {
+                stringBuilder.append(",");
+            }
+        }
+        return stringBuilder.toString();
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
new file mode 100644
index 000000000..a7630a8fe
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateTableAction;
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/** Tests for {@link MigrateFileProcedure}. */
+public class MigrateTableProcedureTest extends ActionITCaseBase {
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9084;
+
+    @BeforeEach
+    public void beforeEach() {
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testOrc() throws Exception {
+        testUpgradeNonPartitionTable("orc");
+        resetMetastore();
+        testUpgradePartitionTable("orc");
+    }
+
+    @Test
+    public void testParquetNonPartitionTable() throws Exception {
+        testUpgradeNonPartitionTable("parquet");
+        resetMetastore();
+        testUpgradePartitionTable("parquet");
+    }
+
+    private void resetMetastore() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+        TEST_HIVE_METASTORE.reset();
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    public void testUpgradePartitionTable(String format) throws Exception {
+        StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+        TableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql(
+                        "CALL sys.migrate_table('hive', 'default.hivetable', 
'file.format="
+                                + format
+                                + "')")
+                .await();
+        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    public void testUpgradeNonPartitionTable(String format) throws Exception {
+        StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+        TableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE TABLE hivetable (id string, id2 int, id3 int) 
STORED AS " + format);
+        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql(
+                        "CALL sys.migrate_table('hive', 'default.hivetable', 
'file.format="
+                                + format
+                                + "')")
+                .await();
+        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"orc", "parquet"})
+    public void testMigrateAction(String format) throws Exception {
+        StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+        TableEnvironment tEnv =
+                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+        Map<String, String> catalogConf = new HashMap<>();
+        catalogConf.put("metastore", "hive");
+        catalogConf.put("uri", "thrift://localhost:" + PORT);
+        MigrateTableAction migrateTableAction =
+                new MigrateTableAction(
+                        "hive",
+                        
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+                        "default.hivetable",
+                        catalogConf,
+                        "");
+        migrateTableAction.run();
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
+
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    private String data(int i) {
+        Random random = new Random();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int m = 0; m < i; m++) {
+            stringBuilder.append("(");
+            stringBuilder.append("\"");
+            stringBuilder.append('a' + m);
+            stringBuilder.append("\",");
+            stringBuilder.append(random.nextInt(10));
+            stringBuilder.append(",");
+            stringBuilder.append(random.nextInt(10));
+            stringBuilder.append(")");
+            if (m != i - 1) {
+                stringBuilder.append(",");
+            }
+        }
+        return stringBuilder.toString();
+    }
+}

Reply via email to