This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4d237c51a [hive] migrate_file support save source table (#3780)
4d237c51a is described below
commit 4d237c51a80ac7892a95c7f19141249dcceed36a
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Jul 22 11:30:48 2024 +0800
[hive] migrate_file support save source table (#3780)
---
docs/content/spark/procedures.md | 11 ++++
.../flink/procedure/MigrateFileProcedure.java | 31 +++++++--
.../hive/procedure/MigrateFileProcedureITCase.java | 35 ++++++++++
.../spark/procedure/MigrateFileProcedure.java | 10 ++-
.../spark/procedure/MigrateFileProcedureTest.scala | 76 ++++++++++++++++++++++
5 files changed, 156 insertions(+), 7 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index d9a744cdd..3f9b3de85 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -126,6 +126,17 @@ This section introduce all available spark procedures
about paimon.
</td>
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T',
options => 'file.format=parquet')</td>
</tr>
+ <tr>
+ <td>migrate_file</td>
+ <td>
+ Migrate from hive table to a paimon table. Arguments:
+ <li>source_type: the origin table's type to be migrated, such as
hive. Cannot be empty.</li>
+ <li>source_table: name of the origin table to migrate. Cannot be
empty.</li>
+ <li>target_table: name of the target table to be migrated. Cannot
be empty.</li>
+ <li>delete_origin: If had set target_table, can set delete_origin
to decide whether delete the origin table metadata from hms after migrate.
Default is true</li>
+ </td>
+ <td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T',
delete_origin => true)</td>
+ </tr>
<tr>
<td>remove_orphan_files</td>
<td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index c3e197291..c9a273336 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.migrate.Migrator;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -40,6 +41,27 @@ public class MigrateFileProcedure extends ProcedureBase {
String sourceTablePath,
String targetPaimonTablePath)
throws Exception {
+ call(procedureContext, connector, sourceTablePath,
targetPaimonTablePath, true);
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath,
+ boolean deleteOrigin)
+ throws Exception {
+ migrateHandle(connector, sourceTablePath, targetPaimonTablePath,
deleteOrigin);
+ return new String[] {"Success"};
+ }
+
+ public void migrateHandle(
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath,
+ boolean deleteOrigin)
+ throws Exception {
if (!(catalog instanceof HiveCatalog)) {
throw new IllegalArgumentException("Only support Hive Catalog");
}
@@ -51,15 +73,16 @@ public class MigrateFileProcedure extends ProcedureBase {
"Target paimon table does not exist: " +
targetPaimonTablePath);
}
- TableMigrationUtils.getImporter(
+ Migrator importer =
+ TableMigrationUtils.getImporter(
connector,
(HiveCatalog) catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
- Collections.emptyMap())
- .executeMigrate();
- return new String[] {"Success"};
+ Collections.emptyMap());
+ importer.deleteOriginTable(deleteOrigin);
+ importer.executeMigrate();
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index b71a38c59..19b5a4f97 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -56,16 +56,19 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
@Test
public void testOrc() throws Exception {
test("orc");
+ testWithDeleteOrigin("orc");
}
@Test
public void testAvro() throws Exception {
test("avro");
+ testWithDeleteOrigin("avro");
}
@Test
public void testParquet() throws Exception {
test("parquet");
+ testWithDeleteOrigin("parquet");
}
public void test(String format) throws Exception {
@@ -100,6 +103,38 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
+ public void testWithDeleteOrigin(String format) throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable01");
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql(
+ "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+ tEnv.executeSql(
+ "CALL sys.migrate_file('hive', 'default.hivetable01',
'default.paimontable01', false)")
+ .await();
+ tEnv.useCatalog("HIVE");
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable01").collect());
+ Assertions.assertThat(r1.size() == 0);
+ }
+
private String data(int i) {
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
index 177ad5c2b..8f4850fef 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
@@ -26,13 +26,13 @@ import org.apache.paimon.spark.utils.TableMigrationUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Collections;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
@@ -48,13 +48,14 @@ public class MigrateFileProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("source_type", StringType),
ProcedureParameter.required("source_table", StringType),
- ProcedureParameter.required("target_table", StringType)
+ ProcedureParameter.required("target_table", StringType),
+ ProcedureParameter.optional("delete_origin", BooleanType)
};
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ new StructField("result", BooleanType, true,
Metadata.empty())
});
protected MigrateFileProcedure(TableCatalog tableCatalog) {
@@ -76,6 +77,7 @@ public class MigrateFileProcedure extends BaseProcedure {
String format = args.getString(0);
String sourceTable = args.getString(1);
String targetTable = args.getString(2);
+ boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier targetTableId = Identifier.fromString(targetTable);
@@ -97,6 +99,8 @@ public class MigrateFileProcedure extends BaseProcedure {
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
Collections.emptyMap());
+
+ migrator.deleteOriginTable(deleteNeed);
migrator.executeMigrate();
} catch (Exception e) {
throw new RuntimeException("Call migrate_file error", e);
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
index 90428b331..ceb0879cb 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
@@ -57,6 +57,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
}
})
+ Seq("parquet", "orc", "avro").foreach(
+ format => {
+ test(
+ s"Paimon migrate file procedure: migrate $format non-partitioned table
with delete source table") {
+ withTable("hive_tbl", "paimon_tbl") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl (id STRING, name STRING, pt
STRING)
+ |USING $format
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b',
'p2')")
+
+ // create paimon table
+ spark.sql(s"""
+ |CREATE TABLE paimon_tbl (id STRING, name STRING, pt
STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
+
+ spark.sql(
+ s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
+
+ checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
+ Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1")
:: Row(
+ "4",
+ "d",
+ "p2") :: Nil)
+ }
+ }
+ })
+
Seq("parquet", "orc", "avro").foreach(
format => {
test(s"Paimon migrate file procedure: migrate $format partitioned
table") {
@@ -92,4 +129,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
}
}
})
+
+ Seq("parquet", "orc", "avro").foreach(
+ format => {
+ test(
+ s"Paimon migrate file procedure: migrate $format partitioned table
with delete source table") {
+ withTable("hive_tbl", "paimon_tbl") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl (id STRING, name STRING, pt
STRING)
+ |USING $format
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b',
'p2')")
+
+ // create paimon table
+ spark.sql(s"""
+ |CREATE TABLE paimon_tbl (id STRING, name STRING, pt
STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
+
+ spark.sql(
+ s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
+ Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1")
:: Row(
+ "4",
+ "d",
+ "p2") :: Nil)
+
+ checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
+ }
+ }
+ })
}