This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b7d25eb [flink] Support migrate file action (#3794)
c4b7d25eb is described below

commit c4b7d25eba1e61aabbb77fcee4c5f2cb9c937f49
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 23 17:43:07 2024 +0800

    [flink] Support migrate file action (#3794)
---
 .../paimon/flink/action/MigrateFileAction.java     | 63 +++++++++++++++++
 .../flink/action/MigrateFileActionFactory.java     | 80 ++++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  1 +
 .../hive/procedure/MigrateFileProcedureITCase.java | 38 ++++++++--
 4 files changed, 177 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
new file mode 100644
index 000000000..0f2404685
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external hive table to paimon table. */
+public class MigrateFileAction extends ActionBase {
+
+    private final String connector;
+    private final String sourceTable;
+    private final String targetTable;
+    private final String tableProperties;
+    private boolean deleteOrigin;
+
+    public MigrateFileAction(
+            String connector,
+            String warehouse,
+            String sourceTable,
+            String targetTable,
+            boolean deleteOrigin,
+            Map<String, String> catalogConfig,
+            String tableProperties) {
+        super(warehouse, catalogConfig);
+        this.connector = connector;
+        this.sourceTable = sourceTable;
+        this.targetTable = targetTable;
+        this.deleteOrigin = deleteOrigin;
+        this.tableProperties = tableProperties;
+    }
+
+    @Override
+    public void run() throws Exception {
+        MigrateFileProcedure migrateTableProcedure = new 
MigrateFileProcedure();
+        migrateTableProcedure.withCatalog(catalog);
+        migrateTableProcedure.call(
+                new DefaultProcedureContext(env),
+                connector,
+                sourceTable,
+                targetTable,
+                deleteOrigin);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
new file mode 100644
index 000000000..3c15b03cf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateFileAction}. */
+public class MigrateFileActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "migrate_file";
+
+    private static final String SOURCE_TYPE = "source_type";
+
+    private static final String SOURCE_TABLE = "source_table";
+
+    private static final String TARGET_TABLE = "target_table";
+
+    private static final String DELETE_ORIGIN = "delete_origin";
+
+    private static final String OPTIONS = "options";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        String warehouse = params.get(WAREHOUSE);
+        String connector = params.get(SOURCE_TYPE);
+        String sourceHiveTable = params.get(SOURCE_TABLE);
+        String targetTable = params.get(TARGET_TABLE);
+        boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
+        String tableConf = params.get(OPTIONS);
+
+        MigrateFileAction migrateFileAction =
+                new MigrateFileAction(
+                        connector,
+                        warehouse,
+                        sourceHiveTable,
+                        targetTable,
+                        deleteOrigin,
+                        catalogConfig,
+                        tableConf);
+        return Optional.of(migrateFileAction);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"migrate_file\" runs a migrating job from 
hive to paimon.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  migrate_file --warehouse <warehouse_path> --source_type 
hive "
+                        + "--source_table <database.table_name> "
+                        + "--target_table <database.table_name> "
+                        + "--delete_origin true "
+                        + "[--catalog_conf <key>=<value] "
+                        + "[--options <key>=<value>,<key>=<value>,...]");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6d3dc2491..e38d6ebd4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
 org.apache.paimon.flink.action.MigrateTableActionFactory
+org.apache.paimon.flink.action.MigrateFileActionFactory
 org.apache.paimon.flink.action.MigrateDatabaseActionFactory
 org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
 org.apache.paimon.flink.action.QueryServiceActionFactory
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 19b5a4f97..721578631 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive.procedure;
 
 import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateFileAction;
 import org.apache.paimon.flink.procedure.MigrateFileProcedure;
 import org.apache.paimon.hive.TestHiveMetastore;
 
@@ -33,7 +34,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 /** Tests for {@link MigrateFileProcedure}. */
@@ -56,19 +59,19 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
     @Test
     public void testOrc() throws Exception {
         test("orc");
-        testWithDeleteOrigin("orc");
+        testMigrateFileAction("orc");
     }
 
     @Test
     public void testAvro() throws Exception {
         test("avro");
-        testWithDeleteOrigin("avro");
+        testMigrateFileAction("avro");
     }
 
     @Test
     public void testParquet() throws Exception {
         test("parquet");
-        testWithDeleteOrigin("parquet");
+        testMigrateFileAction("parquet");
     }
 
     public void test(String format) throws Exception {
@@ -103,7 +106,7 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
-    public void testWithDeleteOrigin(String format) throws Exception {
+    public void testMigrateFileAction(String format) throws Exception {
         TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
@@ -112,7 +115,12 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
                 "CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
                         + format);
         tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
-        tEnv.executeSql("SHOW CREATE TABLE hivetable01");
+
+        tEnv.executeSql(
+                "CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await();
+        tEnv.executeSql("SHOW CREATE TABLE hivetable02");
 
         tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
@@ -127,12 +135,32 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
         tEnv.useCatalog("PAIMON");
         tEnv.executeSql(
                 "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+        tEnv.executeSql(
+                "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
         tEnv.executeSql(
                         "CALL sys.migrate_file('hive', 'default.hivetable01', 
'default.paimontable01', false)")
                 .await();
+
+        tEnv.useCatalog("PAIMON_GE");
+        Map<String, String> catalogConf = new HashMap<>();
+        catalogConf.put("metastore", "hive");
+        catalogConf.put("uri", "thrift://localhost:" + PORT);
+        MigrateFileAction migrateFileAction =
+                new MigrateFileAction(
+                        "hive",
+                        
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+                        "default.hivetable02",
+                        "default.paimontable02",
+                        false,
+                        catalogConf,
+                        "");
+        migrateFileAction.run();
+
         tEnv.useCatalog("HIVE");
         List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable01").collect());
+        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable02").collect());
         Assertions.assertThat(r1.size() == 0);
+        Assertions.assertThat(r2.size() == 0);
     }
 
     private String data(int i) {

Reply via email to