This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c7a17b2b3 [flink][hive] Introduce procedure to migrate table from hive
to paimon (#2368)
c7a17b2b3 is described below
commit c7a17b2b37e48a04e49e06b56fd3c58cc969cc66
Author: YeJunHao <[email protected]>
AuthorDate: Tue Nov 28 11:57:28 2023 +0800
[flink][hive] Introduce procedure to migrate table from hive to paimon
(#2368)
---
docs/content/migration/migration-from-hive.md | 106 ++++++
.../java/org/apache/paimon/data/BinaryWriter.java | 8 +-
.../apache/paimon/format/TableStatsExtractor.java | 17 +
.../org/apache/paimon/utils/ParameterUtils.java | 6 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 4 +
.../org/apache/paimon/migrate/FileMetaUtils.java | 186 +++++++++++
.../java/org/apache/paimon/migrate/Migrator.java | 13 +-
.../paimon/stats/TestTableStatsExtractor.java | 9 +-
paimon-flink/paimon-flink-common/pom.xml | 1 +
.../java/org/apache/paimon/flink/FlinkCatalog.java | 7 +-
.../paimon/flink/action/MigrateTableAction.java | 53 +++
.../flink/action/MigrateTableActionFactory.java | 62 ++++
.../flink/procedure/MigrateFileProcedure.java | 65 ++++
.../flink/procedure/MigrateTableProcedure.java | 72 +++++
.../paimon/flink/procedure/ProcedureBase.java | 2 +-
.../paimon/flink/utils/TableMigrationUtils.java | 51 +++
.../services/org.apache.paimon.factories.Factory | 3 +
.../format/orc/filter/OrcTableStatsExtractor.java | 37 ++-
.../format/parquet/ParquetTableStatsExtractor.java | 29 +-
.../apache/paimon/format/parquet/ParquetUtil.java | 8 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 14 +-
.../apache/paimon/hive/migrate/HiveMigrator.java | 360 +++++++++++++++++++++
.../org/apache/paimon/hive/TestHiveMetastore.java | 50 ++-
paimon-hive/paimon-hive-connector-common/pom.xml | 16 +
.../hive/procedure/MigrateFileProcedureTest.java | 122 +++++++
.../hive/procedure/MigrateTableProcedureTest.java | 216 +++++++++++++
26 files changed, 1455 insertions(+), 62 deletions(-)
diff --git a/docs/content/migration/migration-from-hive.md
b/docs/content/migration/migration-from-hive.md
new file mode 100644
index 000000000..831d8102b
--- /dev/null
+++ b/docs/content/migration/migration-from-hive.md
@@ -0,0 +1,106 @@
+---
+title: "Migration From Hive"
+weight: 1
+type: docs
+aliases:
+- /migration/migration-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hive Table Migration
+
+Apache Hive supports ORC, Parquet file formats that could be migrated to
Paimon.
+When migrating data to a paimon table, the origin table will be permanently
disappeared. So please back up your data if you
+still need the original table. The migrated table will be [unaware-bucket
append-only table]({{< ref
"concepts/append-only-table#append-for-scalable-table" >}}).
+
+Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate
File Procedure to totally migrate a table from hive to paimon.
+
+* Migrate Table Procedure: Paimon table does not exist, use the procedure
upgrade hive table to paimon table. Hive table will disappear after action done.
+* Migrate File Procedure: Paimon table already exists, use the procedure to
migrate files from hive table to paimon table. **Notice that, Hive table will
also disappear after action done.**
+
+These two actions now only support file format of hive "orc" and "parquet", if
your table partition formatted by other format like avro, these procedures will
fail.
+But we will support avro format in the future. Please make sure your table
partition format is in "orc" and "parquet" now.
+
+<span style="color: red; "> **We highly recommend to back up hive table data
before migrating, because migrating action is not atomic. If been interrupted
while migrating, you may lose your data.** </span>
+
+## Example for Migration
+
+**Migrate Hive Table**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_table</font>('hive',
'<hive_database>.<hive_tablename>',
'<paimon_tableconf>');***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_table('hive', 'default.hivetable', 'file.format=orc');
+```
+After invoke, "hivetable" will totally convert to paimon format. Writing and
reading the table by old "hive way" will fail.
+We can add our table properties while importing by
sys.migrate_table('<database>.<tablename>', '<tableproperties>').
+<tableproperties> here should be separated by ",". For example:
+
+```sql
+CALL sys.migrate_table('hive', 'my_db.wait_to_upgrate',
'file.format=orc,read.batch-size=2096,write-only=true')
+```
+
+If your flink version is below 1.17, you can use flink action to achieve this:
+```bash
+<FLINK_HOME>/bin/flink run \
+/path/to/paimon-flink-action-{{< version >}}.jar \
+migrate_table
+--warehouse <warehouse-path> \
+--source-table-type hive \
+--source-table-id <database.table-name> \
+[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf>
...]] \
+[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf>
...]]
+```
+
+Example:
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-0.7-SNAPSHOT.jar migrate_table \
+--warehouse /path/to/warehouse \
+--catalog-conf uri=thrift://localhost:9083 \
+--catalog-conf metastore=hive \
+--source-table-type hive \
+--source-table-id default.hive_or_paimon \
+```
+
+**Migrate Hive File**
+
+Command: <br>
+
+***CALL <font color="green">sys.migrate_file</font>('hive',
'<hive_database>.<hive_table_name>',
'<paimon_database>.<paimon_tablename>');***
+
+**Example**
+
+```sql
+CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+
+USE CATALOG PAIMON;
+
+CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable');
+```
+After invoke, "hivetable" will disappear. And all files will be moved and
renamed to paimon directory. "paimontable" here must have the same
+partition keys with "hivetable", and "paimontable" should be in unaware-bucket
mode.
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
index 0da3bced2..1c037425c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
@@ -144,9 +144,9 @@ public interface BinaryWriter {
}
/**
- * Creates an accessor for setting the elements of an array writer during
runtime.
+ * Creates an accessor for setting the elements of a binary writer during
runtime.
*
- * @param elementType the element type of the array
+ * @param elementType the element type
*/
static ValueSetter createValueSetter(DataType elementType) {
// ordered by type root definition
@@ -208,8 +208,8 @@ public interface BinaryWriter {
}
}
- /** Accessor for setting the elements of an array writer during runtime. */
+ /** Accessor for setting the elements of a binary writer during runtime. */
interface ValueSetter extends Serializable {
- void setValue(BinaryArrayWriter writer, int pos, Object value);
+ void setValue(BinaryWriter writer, int pos, Object value);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
index e5e8e3eb0..4fae3d5e5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
import java.io.IOException;
@@ -27,4 +28,20 @@ import java.io.IOException;
public interface TableStatsExtractor {
FieldStats[] extract(FileIO fileIO, Path path) throws IOException;
+
+ Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path)
throws IOException;
+
+ /** File info fetched from physical file. */
+ class FileInfo {
+
+ private long rowCount;
+
+ public FileInfo(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
index 693ba494e..5d12cf27f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
@@ -36,8 +36,10 @@ public class ParameterUtils {
public static Map<String, String> parseCommaSeparatedKeyValues(String
keyValues) {
Map<String, String> kvs = new HashMap<>();
- for (String kvString : keyValues.split(",")) {
- parseKeyValueString(kvs, kvString);
+ if (!StringUtils.isBlank(keyValues)) {
+ for (String kvString : keyValues.split(",")) {
+ parseKeyValueString(kvs, kvString);
+ }
}
return kvs;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a54ea924d..c01baa774 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -350,6 +350,10 @@ public abstract class AbstractCatalog implements Catalog {
tableDefaultOptions.forEach(options::putIfAbsent);
}
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
private String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
new file mode 100644
index 000000000..c39a68001
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.migrate;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.TableStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.NewFilesIncrement;
+import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.stats.BinaryTableStats;
+import org.apache.paimon.stats.FieldStatsArraySerializer;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StatsCollectorFactories;
+import org.apache.paimon.utils.TypeUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** To construct file meta data for external files. */
+public class FileMetaUtils {
+
+ public static List<DataFileMeta> construct(
+ FileIO fileIO,
+ String format,
+ String location,
+ Table paimonTable,
+ Predicate<FileStatus> filter,
+ Path dir,
+ Map<Path, Path> rollback)
+ throws IOException {
+ List<FileStatus> fileStatuses =
+ Arrays.stream(fileIO.listStatus(new Path(location)))
+ .filter(s -> !s.isDir())
+ .filter(filter)
+ .collect(Collectors.toList());
+
+ return fileStatuses.stream()
+ .map(
+ status ->
+ constructFileMeta(
+ format, status, fileIO, paimonTable,
dir, rollback))
+ .collect(Collectors.toList());
+ }
+
+ public static CommitMessage commitFile(BinaryRow partition,
List<DataFileMeta> dataFileMetas) {
+ return new CommitMessageImpl(
+ partition,
+ 0,
+ new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
+ new CompactIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
+ }
+
+ // -----------------------------private
method---------------------------------------------
+
+ private static DataFileMeta constructFileMeta(
+ String format,
+ FileStatus fileStatus,
+ FileIO fileIO,
+ Table table,
+ Path dir,
+ Map<Path, Path> rollback) {
+
+ try {
+ FieldStatsCollector.Factory[] factories =
+ StatsCollectorFactories.createStatsFactories(
+ ((AbstractFileStoreTable) table).coreOptions(),
+ table.rowType().getFieldNames());
+
+ TableStatsExtractor tableStatsExtractor =
+ FileFormat.getFileFormat(
+ ((AbstractFileStoreTable) table)
+ .coreOptions()
+ .toConfiguration(),
+ format)
+ .createStatsExtractor(table.rowType(), factories)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Can't get table stats
extractor for format "
+ + format));
+ Path newPath = renameFile(fileIO, fileStatus.getPath(), dir,
format, rollback);
+ return constructFileMeta(
+ newPath.getName(),
+ fileStatus.getLen(),
+ newPath,
+ tableStatsExtractor,
+ fileIO,
+ table);
+ } catch (IOException e) {
+ throw new RuntimeException("error when construct file meta", e);
+ }
+ }
+
+ private static Path renameFile(
+ FileIO fileIO, Path originPath, Path newDir, String format,
Map<Path, Path> rollback)
+ throws IOException {
+ String subfix = "." + format;
+ String fileName = originPath.getName();
+ String newFileName = fileName.endsWith(subfix) ? fileName : fileName +
"." + format;
+ Path newPath = new Path(newDir, newFileName);
+ rollback.put(newPath, originPath);
+ fileIO.rename(originPath, newPath);
+ return newPath;
+ }
+
+ private static DataFileMeta constructFileMeta(
+ String fileName,
+ long fileSize,
+ Path path,
+ TableStatsExtractor tableStatsExtractor,
+ FileIO fileIO,
+ Table table)
+ throws IOException {
+ FieldStatsArraySerializer statsArraySerializer =
+ new FieldStatsArraySerializer(table.rowType());
+
+ Pair<FieldStats[], TableStatsExtractor.FileInfo> fileInfo =
+ tableStatsExtractor.extractWithFileInfo(fileIO, path);
+ BinaryTableStats stats =
statsArraySerializer.toBinary(fileInfo.getLeft());
+
+ return DataFileMeta.forAppend(
+ fileName,
+ fileSize,
+ fileInfo.getRight().getRowCount(),
+ stats,
+ 0,
+ 0,
+ ((AbstractFileStoreTable) table).schema().id());
+ }
+
+ public static BinaryRow writePartitionValue(
+ RowType partitionRowType,
+ Map<String, String> partitionValues,
+ List<BinaryWriter.ValueSetter> valueSetters) {
+
+ BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+ List<DataField> fields = partitionRowType.getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ Object value =
+ TypeUtils.castFromString(
+ partitionValues.get(fields.get(i).name()),
fields.get(i).type());
+ valueSetters.get(i).setValue(binaryRowWriter, i, value);
+ }
+ binaryRowWriter.complete();
+ return binaryRow;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
similarity index 73%
copy from
paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
copy to paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
index e5e8e3eb0..7420528e4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
@@ -16,15 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.format;
+package org.apache.paimon.migrate;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
+/** Migrator interface for migrating table from other data-lake like hive,
iceberg, hudi and etc. */
+public interface Migrator {
-import java.io.IOException;
-
-/** Extracts statistics directly from file. */
-public interface TableStatsExtractor {
-
- FieldStats[] extract(FileIO fileIO, Path path) throws IOException;
+ void executeMigrate() throws Exception;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 530fc9b5b..7c9572653 100644
---
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -30,6 +30,7 @@ import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
@@ -57,6 +58,12 @@ public class TestTableStatsExtractor implements
TableStatsExtractor {
@Override
public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
+ return extractWithFileInfo(fileIO, path).getLeft();
+ }
+
+ @Override
+ public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO,
Path path)
+ throws IOException {
IdentityObjectSerializer serializer = new
IdentityObjectSerializer(rowType);
FormatReaderFactory readerFactory =
format.createReaderFactory(rowType);
List<InternalRow> records =
@@ -66,7 +73,7 @@ public class TestTableStatsExtractor implements
TableStatsExtractor {
for (InternalRow record : records) {
statsCollector.collect(record);
}
- return statsCollector.extract();
+ return Pair.of(statsCollector.extract(), new FileInfo(records.size()));
}
private static class IdentityObjectSerializer extends
ObjectSerializer<InternalRow> {
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index e76526445..c5a7d32a7 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,6 +35,7 @@ under the License.
<properties>
<flink.version>1.18.0</flink.version>
+ <hive.version>2.3.9</hive.version>
</properties>
<dependencies>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 84c628f9e..9a180c678 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -349,12 +349,9 @@ public class FlinkCatalog extends AbstractCatalog {
}
// remove table path
- String specific = options.remove(PATH.key());
- if (specific != null || logStoreAutoRegister) {
- catalogTable = catalogTable.copy(options);
- }
+ options.remove(PATH.key());
- return fromCatalogTable(catalogTable);
+ return fromCatalogTable(catalogTable.copy(options));
}
private List<SchemaChange> toSchemaChange(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
new file mode 100644
index 000000000..ae7ff0216
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateTableProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external hive table to paimon table. */
+public class MigrateTableAction extends ActionBase {
+
+ private final String connector;
+ private final String hiveTableFullName;
+ private final String tableProperties;
+
+ public MigrateTableAction(
+ String connector,
+ String warehouse,
+ String hiveTableFullName,
+ Map<String, String> catalogConfig,
+ String tableProperties) {
+ super(warehouse, catalogConfig);
+ this.connector = connector;
+ this.hiveTableFullName = hiveTableFullName;
+ this.tableProperties = tableProperties;
+ }
+
+ @Override
+ public void run() throws Exception {
+ MigrateTableProcedure migrateTableProcedure = new
MigrateTableProcedure();
+ migrateTableProcedure.withCatalog(catalog);
+ migrateTableProcedure.call(
+ new DefaultProcedureContext(env), connector,
hiveTableFullName, tableProperties);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
new file mode 100644
index 000000000..9efbe6cf4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateTableAction}. */
+public class MigrateTableActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "migrate_table";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterTool params) {
+ String warehouse = params.get("warehouse");
+ String connector = params.get("source-table-type");
+ String sourceHiveTable = params.get("source-table-id");
+ Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
+ String tableConf = params.get("table-properties");
+
+ MigrateTableAction migrateTableAction =
+ new MigrateTableAction(
+ connector, warehouse, sourceHiveTable, catalogConfig,
tableConf);
+ return Optional.of(migrateTableAction);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"migrate_table\" runs a migrating job from
hive to paimon.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " compact --warehouse <warehouse-path> --source-table-type
hive "
+ + "--source-table-id <database.table_name> "
+ + "[--catalog-conf <key>=<value] "
+ + "[--table-properties
<key>=<value>,<key>=<value>,...]");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
new file mode 100644
index 000000000..c3e197291
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Collections;
+
+/** Add file procedure to add file from hive to paimon. */
+public class MigrateFileProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "migrate_file";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath)
+ throws Exception {
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+
+ if (!(catalog.tableExists(targetTableId))) {
+ throw new IllegalArgumentException(
+ "Target paimon table does not exist: " +
targetPaimonTablePath);
+ }
+
+ TableMigrationUtils.getImporter(
+ connector,
+ (HiveCatalog) catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+ Collections.emptyMap())
+ .executeMigrate();
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
new file mode 100644
index 000000000..31b86dbb6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Migrate procedure to migrate hive table to paimon table. */
+public class MigrateTableProcedure extends ProcedureBase {
+
+ private static final String PAIMON_SUFFIX = "_paimon_";
+
+ @Override
+ public String identifier() {
+ return "migrate_table";
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String connector, String
sourceTablePath)
+ throws Exception {
+ return call(procedureContext, connector, sourceTablePath, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String properties)
+ throws Exception {
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
+
+ String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+
+ TableMigrationUtils.getImporter(
+ connector,
+ (HiveCatalog) catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+
ParameterUtils.parseCommaSeparatedKeyValues(properties))
+ .executeMigrate();
+
+ catalog.renameTable(targetTableId, sourceTableId, false);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 17490cb4a..5f71d53c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -41,7 +41,7 @@ public abstract class ProcedureBase implements Procedure,
Factory {
protected Catalog catalog;
- ProcedureBase withCatalog(Catalog catalog) {
+ public ProcedureBase withCatalog(Catalog catalog) {
this.catalog = catalog;
return this;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
new file mode 100644
index 000000000..32f263ab3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.migrate.HiveMigrator;
+import org.apache.paimon.migrate.Migrator;
+
+import java.util.Map;
+
+/** Migration util to choose importer according to connector. */
+public class TableMigrationUtils {
+
+ public static Migrator getImporter(
+ String connector,
+ HiveCatalog paimonCatalog,
+ String sourceDatabase,
+ String souceTableName,
+ String targetDatabase,
+ String targetTableName,
+ Map<String, String> options) {
+ switch (connector) {
+ case "hive":
+ return new HiveMigrator(
+ paimonCatalog,
+ sourceDatabase,
+ souceTableName,
+ targetDatabase,
+ targetTableName,
+ options);
+ default:
+ throw new UnsupportedOperationException("Don't support
connector " + connector);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1196880bf..bac5cdd83 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -23,6 +23,7 @@ org.apache.paimon.flink.action.RollbackToActionFactory
org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
+org.apache.paimon.flink.action.MigrateTableActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -33,3 +34,5 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
+org.apache.paimon.flink.procedure.MigrateTableProcedure
+org.apache.paimon.flink.procedure.MigrateFileProcedure
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index 588e04f92..a4e7acac9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,12 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
@Override
public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
+ return extractWithFileInfo(fileIO, path).getLeft();
+ }
+
+ @Override
+ public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO,
Path path)
+ throws IOException {
try (Reader reader = OrcReaderFactory.createReader(new
Configuration(), fileIO, path)) {
long rowCount = reader.getNumberOfRows();
ColumnStatistics[] columnStatistics = reader.getStatistics();
@@ -76,16 +83,26 @@ public class OrcTableStatsExtractor implements
TableStatsExtractor {
FieldStatsCollector[] collectors =
FieldStatsCollector.create(statsCollectors);
- return IntStream.range(0, rowType.getFieldCount())
- .mapToObj(
- i -> {
- DataField field = rowType.getFields().get(i);
- int fieldIdx =
columnNames.indexOf(field.name());
- int colId = columnTypes.get(fieldIdx).getId();
- return toFieldStats(
- field, columnStatistics[colId],
rowCount, collectors[i]);
- })
- .toArray(FieldStats[]::new);
+ return Pair.of(
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(
+ i -> {
+ DataField field =
rowType.getFields().get(i);
+ int fieldIdx =
columnNames.indexOf(field.name());
+ if (fieldIdx == -1) {
+ return collectors[i].convert(
+ new FieldStats(null, null,
null));
+ } else {
+ int colId =
columnTypes.get(fieldIdx).getId();
+ return toFieldStats(
+ field,
+ columnStatistics[colId],
+ rowCount,
+ collectors[i]);
+ }
+ })
+ .toArray(FieldStats[]::new),
+ new FileInfo(rowCount));
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index e8836b8fe..9055f64b0 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.parquet.column.statistics.BinaryStatistics;
@@ -67,15 +68,27 @@ public class ParquetTableStatsExtractor implements
TableStatsExtractor {
@Override
public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
- Map<String, Statistics<?>> stats =
ParquetUtil.extractColumnStats(fileIO, path);
+ return extractWithFileInfo(fileIO, path).getLeft();
+ }
+
+ @Override
+ public Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO,
Path path)
+ throws IOException {
+ Pair<Map<String, Statistics<?>>, FileInfo> statsPair =
+ ParquetUtil.extractColumnStats(fileIO, path);
FieldStatsCollector[] collectors =
FieldStatsCollector.create(statsCollectors);
- return IntStream.range(0, rowType.getFieldCount())
- .mapToObj(
- i -> {
- DataField field = rowType.getFields().get(i);
- return toFieldStats(field,
stats.get(field.name()), collectors[i]);
- })
- .toArray(FieldStats[]::new);
+ return Pair.of(
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(
+ i -> {
+ DataField field =
rowType.getFields().get(i);
+ return toFieldStats(
+ field,
+
statsPair.getLeft().get(field.name()),
+ collectors[i]);
+ })
+ .toArray(FieldStats[]::new),
+ statsPair.getRight());
}
private FieldStats toFieldStats(
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
index b7aa5d956..05650da94 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
@@ -18,9 +18,11 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Pair;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.statistics.Statistics;
@@ -44,8 +46,8 @@ public class ParquetUtil {
* @return result sets as map, key is column name, value is statistics
(for example, null count,
* minimum value, maximum value)
*/
- public static Map<String, Statistics<?>> extractColumnStats(FileIO fileIO,
Path path)
- throws IOException {
+ public static Pair<Map<String, Statistics<?>>,
TableStatsExtractor.FileInfo> extractColumnStats(
+ FileIO fileIO, Path path) throws IOException {
try (ParquetFileReader reader = getParquetReader(fileIO, path)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
@@ -65,7 +67,7 @@ public class ParquetUtil {
resultStats.put(columnName, midStats);
}
}
- return resultStats;
+ return Pair.of(resultStats, new
TableStatsExtractor.FileInfo(reader.getRecordCount()));
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 987ec2c7c..60aef7942 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -58,6 +58,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -515,7 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@VisibleForTesting
- IMetaStoreClient getHmsClient() {
+ public IMetaStoreClient getHmsClient() {
return client;
}
@@ -585,7 +586,6 @@ public class HiveCatalog extends AbstractCatalog {
HiveConf.setLoadMetastoreConfig(false);
HiveConf.setLoadHiveServer2Config(false);
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
-
org.apache.hadoop.fs.Path hiveSite =
new org.apache.hadoop.fs.Path(hiveConfDir, HIVE_SITE_FILE);
if (!hiveSite.toUri().isAbsolute()) {
@@ -603,7 +603,15 @@ public class HiveCatalog extends AbstractCatalog {
return hiveConf;
} else {
- return new HiveConf(hadoopConf, HiveConf.class);
+ HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+ // user doesn't provide hive conf dir, we try to find it in
classpath
+ URL hiveSite =
+
Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);
+ if (hiveSite != null) {
+ LOG.info("Found {} in classpath: {}", HIVE_SITE_FILE,
hiveSite);
+ hiveConf.addResource(hiveSite);
+ }
+ return hiveConf;
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
new file mode 100644
index 000000000..3a3b8f5ae
--- /dev/null
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.migrate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+
+/** Migrate hive table to paimon table. */
+public class HiveMigrator implements Migrator {
+
+ private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+ p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
+
+ private final FileIO fileIO;
+ private final HiveCatalog hiveCatalog;
+ private final IMetaStoreClient client;
+ private final String sourceDatabase;
+ private final String sourceTable;
+ private final String targetDatabase;
+ private final String targetTable;
+ private final Map<String, String> options;
+
+ public HiveMigrator(
+ HiveCatalog hiveCatalog,
+ String sourceDatabase,
+ String sourceTable,
+ String targetDatabase,
+ String targetTable,
+ Map<String, String> options) {
+ this.hiveCatalog = hiveCatalog;
+ this.fileIO = hiveCatalog.fileIO();
+ this.client = hiveCatalog.getHmsClient();
+ this.sourceDatabase = sourceDatabase;
+ this.sourceTable = sourceTable;
+ this.targetDatabase = targetDatabase;
+ this.targetTable = targetTable;
+ this.options = options;
+ }
+
+ public void executeMigrate() throws Exception {
+ if (!client.tableExists(sourceDatabase, sourceTable)) {
+ throw new RuntimeException("Source hive table does not exist");
+ }
+
+ Table sourceHiveTable = client.getTable(sourceDatabase, sourceTable);
+ Map<String, String> properties = new
HashMap<>(sourceHiveTable.getParameters());
+ checkPrimaryKey();
+
+ AbstractFileStoreTable paimonTable =
+ createPaimonTableIfNotExists(
+ client.getSchema(sourceDatabase, sourceTable),
+ sourceHiveTable.getPartitionKeys(),
+ properties);
+ checkPaimonTable(paimonTable);
+
+ List<String> partitionsNames =
+ client.listPartitionNames(sourceDatabase, sourceTable,
Short.MAX_VALUE);
+ checkCompatible(sourceHiveTable, paimonTable);
+
+ List<MigrateTask> tasks = new ArrayList<>();
+ Map<Path, Path> rollBack = new ConcurrentHashMap<>();
+ if (partitionsNames.isEmpty()) {
+ tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable,
paimonTable, rollBack));
+ } else {
+ tasks.addAll(
+ importPartitionedTableTask(
+ client,
+ fileIO,
+ partitionsNames,
+ sourceHiveTable,
+ paimonTable,
+ rollBack));
+ }
+
+ Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
+ List<Future<?>> futures = new ArrayList<>();
+ tasks.forEach(
+ task ->
+ futures.add(
+ COMMON_IO_FORK_JOIN_POOL.submit(
+ () ->
commitMessages.add(task.get()))));
+
+ try {
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } catch (Exception e) {
+ futures.forEach(f -> f.cancel(true));
+ for (Future<?> future : futures) {
+ // wait all task cancelled or finished
+ while (!future.isDone()) {
+ Thread.sleep(100);
+ }
+ }
+ // roll back all renamed path
+ for (Map.Entry<Path, Path> entry : rollBack.entrySet()) {
+ Path newPath = entry.getKey();
+ Path origin = entry.getValue();
+ if (fileIO.exists(newPath)) {
+ fileIO.rename(newPath, origin);
+ }
+ }
+
+ throw new RuntimeException("Migrating failed because exception
happens", e);
+ }
+
+ paimonTable.newBatchWriteBuilder().newCommit().commit(new
ArrayList<>(commitMessages));
+ client.dropTable(sourceDatabase, sourceTable, true, true);
+ }
+
+ private void checkPrimaryKey() throws Exception {
+ PrimaryKeysRequest primaryKeysRequest = new
PrimaryKeysRequest(sourceDatabase, sourceTable);
+ if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {
+ throw new IllegalArgumentException("Can't migrate primary key
table yet.");
+ }
+ }
+
+ private void checkPaimonTable(AbstractFileStoreTable paimonTable) {
+ if (!(paimonTable instanceof AppendOnlyFileStoreTable)) {
+ throw new IllegalArgumentException(
+ "Hive migrator only support append only table target
table");
+ }
+
+ if (paimonTable.store().bucketMode() != BucketMode.UNAWARE) {
+ throw new IllegalArgumentException(
+ "Hive migrator only support unaware-bucket target table");
+ }
+ }
+
+ private AbstractFileStoreTable createPaimonTableIfNotExists(
+ List<FieldSchema> fields,
+ List<FieldSchema> partitionFields,
+ Map<String, String> hiveTableOptions)
+ throws Exception {
+
+ Identifier identifier = Identifier.create(targetDatabase, targetTable);
+ if (!hiveCatalog.tableExists(identifier)) {
+ Schema schema = from(fields, partitionFields, hiveTableOptions);
+ hiveCatalog.createTable(identifier, schema, false);
+ }
+ return (AbstractFileStoreTable) hiveCatalog.getTable(identifier);
+ }
+
+ public Schema from(
+ List<FieldSchema> fields,
+ List<FieldSchema> partitionFields,
+ Map<String, String> hiveTableOptions) {
+ HashMap<String, String> paimonOptions = new HashMap<>(this.options);
+ paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .comment(hiveTableOptions.get("comment"))
+ .options(paimonOptions)
+ .partitionKeys(
+ partitionFields.stream()
+ .map(FieldSchema::getName)
+ .collect(Collectors.toList()));
+
+ fields.forEach(
+ field ->
+ schemaBuilder.column(
+ field.getName(),
+ toPaimonType(field.getType()),
+ field.getComment()));
+
+ return schemaBuilder.build();
+ }
+
+ private List<MigrateTask> importPartitionedTableTask(
+ IMetaStoreClient client,
+ FileIO fileIO,
+ List<String> partitionNames,
+ Table sourceTable,
+ AbstractFileStoreTable paimonTable,
+ Map<Path, Path> rollback)
+ throws Exception {
+ List<MigrateTask> migrateTasks = new ArrayList<>();
+ List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+
+ RowType partitionRowType =
+
paimonTable.schema().projectedLogicalRowType(paimonTable.schema().partitionKeys());
+
+ partitionRowType
+ .getFieldTypes()
+ .forEach(type ->
valueSetters.add(BinaryWriter.createValueSetter(type)));
+
+ for (String partitionName : partitionNames) {
+ Partition partition =
+ client.getPartition(
+ sourceTable.getDbName(),
sourceTable.getTableName(), partitionName);
+ Map<String, String> values =
client.partitionNameToSpec(partitionName);
+ String format =
parseFormat(partition.getSd().getSerdeInfo().toString());
+ String location = partition.getSd().getLocation();
+ BinaryRow partitionRow =
+ FileMetaUtils.writePartitionValue(partitionRowType,
values, valueSetters);
+ Path path =
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+
+ migrateTasks.add(
+ new MigrateTask(
+ fileIO, format, location, paimonTable,
partitionRow, path, rollback));
+ }
+ return migrateTasks;
+ }
+
+ public MigrateTask importUnPartitionedTableTask(
+ FileIO fileIO,
+ Table sourceTable,
+ AbstractFileStoreTable paimonTable,
+ Map<Path, Path> rollback) {
+ String format =
parseFormat(sourceTable.getSd().getSerdeInfo().toString());
+ String location = sourceTable.getSd().getLocation();
+ Path path =
paimonTable.store().pathFactory().bucketPath(BinaryRow.EMPTY_ROW, 0);
+ return new MigrateTask(
+ fileIO, format, location, paimonTable, BinaryRow.EMPTY_ROW,
path, rollback);
+ }
+
+ private void checkCompatible(Table sourceHiveTable, AbstractFileStoreTable
paimonTable) {
+ List<FieldSchema> sourceFields = new
ArrayList<>(sourceHiveTable.getPartitionKeys());
+ List<DataField> targetFields =
+ new ArrayList<>(
+ paimonTable
+ .schema()
+
.projectedLogicalRowType(paimonTable.partitionKeys())
+ .getFields());
+
+ if (sourceFields.size() != targetFields.size()) {
+ throw new RuntimeException(
+ "Source table partition keys not match target table
partition keys.");
+ }
+
+ sourceFields.sort(Comparator.comparing(FieldSchema::getName));
+ targetFields.sort(Comparator.comparing(DataField::name));
+
+ for (int i = 0; i < sourceFields.size(); i++) {
+ FieldSchema s = sourceFields.get(i);
+ DataField t = targetFields.get(i);
+
+ if (!s.getName().equals(t.name())
+ || !s.getType().equalsIgnoreCase(t.type().asSQLString())) {
+ throw new RuntimeException(
+ "Source table partition keys not match target table
partition keys, please checkCompatible.");
+ }
+ }
+ }
+
+ private String parseFormat(String serder) {
+ if (serder.contains("avro")) {
+ throw new UnsupportedOperationException("Can't support format avro
yet.");
+ } else if (serder.contains("parquet")) {
+ return "parquet";
+ } else if (serder.contains("orc")) {
+ return "orc";
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format:
" + serder);
+ }
+ }
+
+ /** One import task for one partition. */
+ public static class MigrateTask implements Supplier<CommitMessage> {
+
+ private final FileIO fileIO;
+ private final String format;
+ private final String location;
+ private final AbstractFileStoreTable paimonTable;
+ private final BinaryRow partitionRow;
+ private final Path newDir;
+ private final Map<Path, Path> rollback;
+
+ public MigrateTask(
+ FileIO fileIO,
+ String format,
+ String location,
+ AbstractFileStoreTable paimonTable,
+ BinaryRow partitionRow,
+ Path newDir,
+ Map<Path, Path> rollback) {
+ this.fileIO = fileIO;
+ this.format = format;
+ this.location = location;
+ this.paimonTable = paimonTable;
+ this.partitionRow = partitionRow;
+ this.newDir = newDir;
+ this.rollback = rollback;
+ }
+
+ @Override
+ public CommitMessage get() {
+ try {
+ List<DataFileMeta> fileMetas =
+ FileMetaUtils.construct(
+ fileIO,
+ format,
+ location,
+ paimonTable,
+ HIDDEN_PATH_FILTER,
+ newDir,
+ rollback);
+ return FileMetaUtils.commitFile(partitionRow, fileMetas);
+ } catch (IOException e) {
+ throw new RuntimeException("Can't get commit message", e);
+ }
+ }
+ }
+}
diff --git
a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
index 16520281c..5b6ea9933 100644
---
a/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
+++
b/paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
@@ -62,27 +62,31 @@ import static
java.nio.file.attribute.PosixFilePermissions.fromString;
*/
public class TestHiveMetastore {
- private static final int DEFAULT_POOL_SIZE = 5;
+ private static final int DEFAULT_POOL_SIZE = 15;
// It's tricky to clear all static fields in an HMS instance in order to
switch derby root dir.
// Therefore, we reuse the same derby root between tests and remove it
after JVM exits.
- private static final File HIVE_LOCAL_DIR;
- private static final String DERBY_PATH;
+ private static File hiveLocalDir;
+ private static String derbyPath;
static {
+ setup();
+ }
+
+ private static void setup() {
try {
- HIVE_LOCAL_DIR =
+ hiveLocalDir =
createTempDirectory("hive",
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
- DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
- File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
+ derbyPath = new File(hiveLocalDir, "metastore_db").getPath();
+ File derbyLogFile = new File(hiveLocalDir, "derby.log");
System.setProperty("derby.stream.error.file",
derbyLogFile.getAbsolutePath());
- setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
+ setupMetastoreDB("jdbc:derby:" + derbyPath + ";create=true");
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
Path localDirPath =
- new
Path(HIVE_LOCAL_DIR.getAbsolutePath());
+ new
Path(hiveLocalDir.getAbsolutePath());
FileSystem fs;
try {
fs =
@@ -110,12 +114,20 @@ public class TestHiveMetastore {
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;
+ /**
+ * Starts a TestHiveMetastore with the default connection pool size (5)
and the default
+ * HiveConf.
+ */
+ public void start(int port) {
+ start(new HiveConf(new Configuration(), TestHiveMetastore.class),
DEFAULT_POOL_SIZE, port);
+ }
+
/**
* Starts a TestHiveMetastore with the default connection pool size (5)
and the default
* HiveConf.
*/
public void start() {
- start(new HiveConf(new Configuration(), TestHiveMetastore.class),
DEFAULT_POOL_SIZE);
+ start(new HiveConf(new Configuration(), TestHiveMetastore.class),
DEFAULT_POOL_SIZE, 9083);
}
/**
@@ -124,9 +136,9 @@ public class TestHiveMetastore {
* @param conf The hive configuration to use
* @param poolSize The number of threads in the executor pool
*/
- public void start(HiveConf conf, int poolSize) {
+ public void start(HiveConf conf, int poolSize, int portNum) {
try {
- TServerSocket socket = new TServerSocket(9083);
+ TServerSocket socket = new TServerSocket(portNum);
int port = socket.getServerSocket().getLocalPort();
initConf(conf, port);
@@ -141,6 +153,7 @@ public class TestHiveMetastore {
System.setProperty(
HiveConf.ConfVars.METASTOREURIS.varname,
hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+ System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
warehouseDir());
} catch (Exception e) {
throw new RuntimeException("Cannot start TestHiveMetastore", e);
}
@@ -157,10 +170,13 @@ public class TestHiveMetastore {
if (baseHandler != null) {
baseHandler.shutdown();
}
+ System.clearProperty(HiveConf.ConfVars.METASTOREURIS.varname);
+ System.clearProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
}
public void reset() throws Exception {
- Path warehouseRoot = new Path(HIVE_LOCAL_DIR.getAbsolutePath());
+ setup();
+ Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath());
FileSystem fs = FileSystem.get(warehouseRoot.toUri(), hiveConf);
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log")
@@ -170,12 +186,16 @@ public class TestHiveMetastore {
}
}
+ private static String warehouseDir() {
+ return "file:" + hiveLocalDir.getAbsolutePath();
+ }
+
private TServer newThriftServer(TServerSocket socket, int poolSize,
HiveConf conf)
throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(
HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
- "jdbc:derby:" + DERBY_PATH + ";create=true");
+ "jdbc:derby:" + derbyPath + ";create=true");
baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver",
serverConf);
IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf,
baseHandler, false);
@@ -192,9 +212,7 @@ public class TestHiveMetastore {
private void initConf(HiveConf conf, int port) {
conf.set(HiveConf.ConfVars.METASTOREURIS.varname,
"thrift://localhost:" + port);
- conf.set(
- HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
- "file:" + HIVE_LOCAL_DIR.getAbsolutePath());
+ conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouseDir());
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
conf.set(
HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index b6a84361d..0f1e86cdc 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -115,6 +115,22 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-hive-catalog</artifactId>
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
new file mode 100644
index 000000000..7fa93a9b7
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Random;
+
+/** Tests for {@link MigrateFileProcedure}. */
+public class MigrateFileProcedureTest extends ActionITCaseBase {
+
+ private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
+
+ private static final int PORT = 9085;
+
+ @BeforeEach
+ public void beforeEach() {
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ }
+
+ @Test
+ public void testOrc() throws Exception {
+ test("orc");
+ }
+
+ @Test
+ public void testParquet() throws Exception {
+ test("parquet");
+ }
+
+ public void test(String format) throws Exception {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+ TableEnvironment tEnv =
+ StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql(
+ "CREATE TABLE paimontable (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+ tEnv.executeSql("CALL sys.migrate_file('hive', 'default.hivetable',
'default.paimontable')")
+ .await();
+ List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
paimontable").collect());
+
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ private String data(int i) {
+ Random random = new Random();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int m = 0; m < i; m++) {
+ stringBuilder.append("(");
+ stringBuilder.append("\"");
+ stringBuilder.append('a' + m);
+ stringBuilder.append("\",");
+ stringBuilder.append(random.nextInt(10));
+ stringBuilder.append(",");
+ stringBuilder.append(random.nextInt(10));
+ stringBuilder.append(")");
+ if (m != i - 1) {
+ stringBuilder.append(",");
+ }
+ }
+ return stringBuilder.toString();
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
new file mode 100644
index 000000000..a7630a8fe
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateTableAction;
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/** Tests for {@link MigrateFileProcedure}. */
+public class MigrateTableProcedureTest extends ActionITCaseBase {
+
+ private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
+
+ private static final int PORT = 9084;
+
+ @BeforeEach
+ public void beforeEach() {
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ }
+
+ @Test
+ public void testOrc() throws Exception {
+ testUpgradeNonPartitionTable("orc");
+ resetMetastore();
+ testUpgradePartitionTable("orc");
+ }
+
+ @Test
+ public void testParquetNonPartitionTable() throws Exception {
+ testUpgradeNonPartitionTable("parquet");
+ resetMetastore();
+ testUpgradePartitionTable("parquet");
+ }
+
+ private void resetMetastore() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ TEST_HIVE_METASTORE.reset();
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ public void testUpgradePartitionTable(String format) throws Exception {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+ TableEnvironment tEnv =
+ StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql(
+ "CALL sys.migrate_table('hive', 'default.hivetable',
'file.format="
+ + format
+ + "')")
+ .await();
+ List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ public void testUpgradeNonPartitionTable(String format) throws Exception {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+ TableEnvironment tEnv =
+ StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE TABLE hivetable (id string, id2 int, id3 int)
STORED AS " + format);
+ tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql(
+ "CALL sys.migrate_table('hive', 'default.hivetable',
'file.format="
+ + format
+ + "')")
+ .await();
+ List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"orc", "parquet"})
+ public void testMigrateAction(String format) throws Exception {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+
+ TableEnvironment tEnv =
+ StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable");
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+ Map<String, String> catalogConf = new HashMap<>();
+ catalogConf.put("metastore", "hive");
+ catalogConf.put("uri", "thrift://localhost:" + PORT);
+ MigrateTableAction migrateTableAction =
+ new MigrateTableAction(
+ "hive",
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+ "default.hivetable",
+ catalogConf,
+ "");
+ migrateTableAction.run();
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+ List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
+
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ private String data(int i) {
+ Random random = new Random();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int m = 0; m < i; m++) {
+ stringBuilder.append("(");
+ stringBuilder.append("\"");
+ stringBuilder.append('a' + m);
+ stringBuilder.append("\",");
+ stringBuilder.append(random.nextInt(10));
+ stringBuilder.append(",");
+ stringBuilder.append(random.nextInt(10));
+ stringBuilder.append(")");
+ if (m != i - 1) {
+ stringBuilder.append(",");
+ }
+ }
+ return stringBuilder.toString();
+ }
+}