This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d237c51a [hive] migrate_file support save source table (#3780)
4d237c51a is described below

commit 4d237c51a80ac7892a95c7f19141249dcceed36a
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Jul 22 11:30:48 2024 +0800

    [hive] migrate_file support save source table (#3780)
---
 docs/content/spark/procedures.md                   | 11 ++++
 .../flink/procedure/MigrateFileProcedure.java      | 31 +++++++--
 .../hive/procedure/MigrateFileProcedureITCase.java | 35 ++++++++++
 .../spark/procedure/MigrateFileProcedure.java      | 10 ++-
 .../spark/procedure/MigrateFileProcedureTest.scala | 76 ++++++++++++++++++++++
 5 files changed, 156 insertions(+), 7 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index d9a744cdd..3f9b3de85 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -126,6 +126,17 @@ This section introduce all available spark procedures 
about paimon.
       </td>
       <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', 
options => 'file.format=parquet')</td>
     </tr>
+    <tr>
+      <td>migrate_file</td>
+      <td>
+         Migrate from hive table to a paimon table. Arguments:
+            <li>source_type: the origin table's type to be migrated, such as 
hive. Cannot be empty.</li>
+            <li>source_table: name of the origin table to migrate. Cannot be 
empty.</li>
+            <li>target_table: name of the target table to be migrated. Cannot 
be empty.</li>
+            <li>delete_origin: If had set target_table, can set delete_origin 
to decide whether delete the origin table metadata from hms after migrate. 
Default is true</li>
+      </td>
+      <td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', 
delete_origin => true)</td>
+    </tr>
     <tr>
       <td>remove_orphan_files</td>
       <td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index c3e197291..c9a273336 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
 import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.migrate.Migrator;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
@@ -40,6 +41,27 @@ public class MigrateFileProcedure extends ProcedureBase {
             String sourceTablePath,
             String targetPaimonTablePath)
             throws Exception {
+        call(procedureContext, connector, sourceTablePath, 
targetPaimonTablePath, true);
+        return new String[] {"Success"};
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String connector,
+            String sourceTablePath,
+            String targetPaimonTablePath,
+            boolean deleteOrigin)
+            throws Exception {
+        migrateHandle(connector, sourceTablePath, targetPaimonTablePath, 
deleteOrigin);
+        return new String[] {"Success"};
+    }
+
+    public void migrateHandle(
+            String connector,
+            String sourceTablePath,
+            String targetPaimonTablePath,
+            boolean deleteOrigin)
+            throws Exception {
         if (!(catalog instanceof HiveCatalog)) {
             throw new IllegalArgumentException("Only support Hive Catalog");
         }
@@ -51,15 +73,16 @@ public class MigrateFileProcedure extends ProcedureBase {
                     "Target paimon table does not exist: " + 
targetPaimonTablePath);
         }
 
-        TableMigrationUtils.getImporter(
+        Migrator importer =
+                TableMigrationUtils.getImporter(
                         connector,
                         (HiveCatalog) catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
-                        Collections.emptyMap())
-                .executeMigrate();
-        return new String[] {"Success"};
+                        Collections.emptyMap());
+        importer.deleteOriginTable(deleteOrigin);
+        importer.executeMigrate();
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index b71a38c59..19b5a4f97 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -56,16 +56,19 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
     @Test
     public void testOrc() throws Exception {
         test("orc");
+        testWithDeleteOrigin("orc");
     }
 
     @Test
     public void testAvro() throws Exception {
         test("avro");
+        testWithDeleteOrigin("avro");
     }
 
     @Test
     public void testParquet() throws Exception {
         test("parquet");
+        testWithDeleteOrigin("parquet");
     }
 
     public void test(String format) throws Exception {
@@ -100,6 +103,38 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
+    public void testWithDeleteOrigin(String format) throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable01");
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql(
+                "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+        tEnv.executeSql(
+                        "CALL sys.migrate_file('hive', 'default.hivetable01', 
'default.paimontable01', false)")
+                .await();
+        tEnv.useCatalog("HIVE");
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable01").collect());
+        Assertions.assertThat(r1.size() == 0);
+    }
+
     private String data(int i) {
         Random random = new Random();
         StringBuilder stringBuilder = new StringBuilder();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
index 177ad5c2b..8f4850fef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
@@ -26,13 +26,13 @@ import org.apache.paimon.spark.utils.TableMigrationUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Collections;
 
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
@@ -48,13 +48,14 @@ public class MigrateFileProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("source_type", StringType),
                 ProcedureParameter.required("source_table", StringType),
-                ProcedureParameter.required("target_table", StringType)
+                ProcedureParameter.required("target_table", StringType),
+                ProcedureParameter.optional("delete_origin", BooleanType)
             };
 
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                        new StructField("result", BooleanType, true, 
Metadata.empty())
                     });
 
     protected MigrateFileProcedure(TableCatalog tableCatalog) {
@@ -76,6 +77,7 @@ public class MigrateFileProcedure extends BaseProcedure {
         String format = args.getString(0);
         String sourceTable = args.getString(1);
         String targetTable = args.getString(2);
+        boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
 
         Identifier sourceTableId = Identifier.fromString(sourceTable);
         Identifier targetTableId = Identifier.fromString(targetTable);
@@ -97,6 +99,8 @@ public class MigrateFileProcedure extends BaseProcedure {
                             targetTableId.getDatabaseName(),
                             targetTableId.getObjectName(),
                             Collections.emptyMap());
+
+            migrator.deleteOriginTable(deleteNeed);
             migrator.executeMigrate();
         } catch (Exception e) {
             throw new RuntimeException("Call migrate_file error", e);
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
index 90428b331..ceb0879cb 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
@@ -57,6 +57,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
       }
     })
 
+  Seq("parquet", "orc", "avro").foreach(
+    format => {
+      test(
+        s"Paimon migrate file procedure: migrate $format non-partitioned table 
with delete source table") {
+        withTable("hive_tbl", "paimon_tbl") {
+          // create hive table
+          spark.sql(s"""
+                       |CREATE TABLE hive_tbl (id STRING, name STRING, pt 
STRING)
+                       |USING $format
+                       |""".stripMargin)
+
+          spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 
'p2')")
+
+          // create paimon table
+          spark.sql(s"""
+                       |CREATE TABLE paimon_tbl (id STRING, name STRING, pt 
STRING)
+                       |USING PAIMON
+                       |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
+                       |""".stripMargin)
+
+          spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
+
+          spark.sql(
+            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
+
+          checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
+
+          checkAnswer(
+            spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
+            Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") 
:: Row(
+              "4",
+              "d",
+              "p2") :: Nil)
+        }
+      }
+    })
+
   Seq("parquet", "orc", "avro").foreach(
     format => {
       test(s"Paimon migrate file procedure: migrate $format partitioned 
table") {
@@ -92,4 +129,43 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
         }
       }
     })
+
+  Seq("parquet", "orc", "avro").foreach(
+    format => {
+      test(
+        s"Paimon migrate file procedure: migrate $format partitioned table 
with delete source table") {
+        withTable("hive_tbl", "paimon_tbl") {
+          // create hive table
+          spark.sql(s"""
+                       |CREATE TABLE hive_tbl (id STRING, name STRING, pt 
STRING)
+                       |USING $format
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+
+          spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 
'p2')")
+
+          // create paimon table
+          spark.sql(s"""
+                       |CREATE TABLE paimon_tbl (id STRING, name STRING, pt 
STRING)
+                       |USING PAIMON
+                       |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+
+          spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
+
+          spark.sql(
+            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
+
+          checkAnswer(
+            spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
+            Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") 
:: Row(
+              "4",
+              "d",
+              "p2") :: Nil)
+
+          checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
+        }
+      }
+    })
 }

Reply via email to