This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c4b7d25eb [flink] Support migrate file action (#3794)
c4b7d25eb is described below
commit c4b7d25eba1e61aabbb77fcee4c5f2cb9c937f49
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 23 17:43:07 2024 +0800
[flink] Support migrate file action (#3794)
---
.../paimon/flink/action/MigrateFileAction.java | 63 +++++++++++++++++
.../flink/action/MigrateFileActionFactory.java | 80 ++++++++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../hive/procedure/MigrateFileProcedureITCase.java | 38 ++++++++--
4 files changed, 177 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
new file mode 100644
index 000000000..0f2404685
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateFileProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from external hive table to paimon table. */
+public class MigrateFileAction extends ActionBase {
+
+ private final String connector;
+ private final String sourceTable;
+ private final String targetTable;
+ private final String tableProperties;
+ private boolean deleteOrigin;
+
+ public MigrateFileAction(
+ String connector,
+ String warehouse,
+ String sourceTable,
+ String targetTable,
+ boolean deleteOrigin,
+ Map<String, String> catalogConfig,
+ String tableProperties) {
+ super(warehouse, catalogConfig);
+ this.connector = connector;
+ this.sourceTable = sourceTable;
+ this.targetTable = targetTable;
+ this.deleteOrigin = deleteOrigin;
+ this.tableProperties = tableProperties;
+ }
+
+ @Override
+ public void run() throws Exception {
+ MigrateFileProcedure migrateTableProcedure = new
MigrateFileProcedure();
+ migrateTableProcedure.withCatalog(catalog);
+ migrateTableProcedure.call(
+ new DefaultProcedureContext(env),
+ connector,
+ sourceTable,
+ targetTable,
+ deleteOrigin);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
new file mode 100644
index 000000000..3c15b03cf
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateFileAction}. */
+public class MigrateFileActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "migrate_file";
+
+ private static final String SOURCE_TYPE = "source_type";
+
+ private static final String SOURCE_TABLE = "source_table";
+
+ private static final String TARGET_TABLE = "target_table";
+
+ private static final String DELETE_ORIGIN = "delete_origin";
+
+ private static final String OPTIONS = "options";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ String warehouse = params.get(WAREHOUSE);
+ String connector = params.get(SOURCE_TYPE);
+ String sourceHiveTable = params.get(SOURCE_TABLE);
+ String targetTable = params.get(TARGET_TABLE);
+ boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
+ Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
+ String tableConf = params.get(OPTIONS);
+
+ MigrateFileAction migrateFileAction =
+ new MigrateFileAction(
+ connector,
+ warehouse,
+ sourceHiveTable,
+ targetTable,
+ deleteOrigin,
+ catalogConfig,
+ tableConf);
+ return Optional.of(migrateFileAction);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"migrate_file\" runs a migrating job from
hive to paimon.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " migrate_file --warehouse <warehouse_path> --source_type
hive "
+ + "--source_table <database.table_name> "
+ + "--target_table <database.table_name> "
+ + "--delete_origin true "
+ + "[--catalog_conf <key>=<value] "
+ + "[--options <key>=<value>,<key>=<value>,...]");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6d3dc2491..e38d6ebd4 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -25,6 +25,7 @@ org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
+org.apache.paimon.flink.action.MigrateFileActionFactory
org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 19b5a4f97..721578631 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive.procedure;
import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateFileAction;
import org.apache.paimon.flink.procedure.MigrateFileProcedure;
import org.apache.paimon.hive.TestHiveMetastore;
@@ -33,7 +34,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
/** Tests for {@link MigrateFileProcedure}. */
@@ -56,19 +59,19 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
@Test
public void testOrc() throws Exception {
test("orc");
- testWithDeleteOrigin("orc");
+ testMigrateFileAction("orc");
}
@Test
public void testAvro() throws Exception {
test("avro");
- testWithDeleteOrigin("avro");
+ testMigrateFileAction("avro");
}
@Test
public void testParquet() throws Exception {
test("parquet");
- testWithDeleteOrigin("parquet");
+ testMigrateFileAction("parquet");
}
public void test(String format) throws Exception {
@@ -103,7 +106,7 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
- public void testWithDeleteOrigin(String format) throws Exception {
+ public void testMigrateFileAction(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
@@ -112,7 +115,12 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
"CREATE TABLE hivetable01 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ format);
tEnv.executeSql("INSERT INTO hivetable01 VALUES" + data(100)).await();
- tEnv.executeSql("SHOW CREATE TABLE hivetable01");
+
+ tEnv.executeSql(
+ "CREATE TABLE hivetable02 (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable02 VALUES" + data(100)).await();
+ tEnv.executeSql("SHOW CREATE TABLE hivetable02");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
@@ -127,12 +135,32 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
+ tEnv.executeSql(
+ "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
tEnv.executeSql(
"CALL sys.migrate_file('hive', 'default.hivetable01',
'default.paimontable01', false)")
.await();
+
+ tEnv.useCatalog("PAIMON_GE");
+ Map<String, String> catalogConf = new HashMap<>();
+ catalogConf.put("metastore", "hive");
+ catalogConf.put("uri", "thrift://localhost:" + PORT);
+ MigrateFileAction migrateFileAction =
+ new MigrateFileAction(
+ "hive",
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+ "default.hivetable02",
+ "default.paimontable02",
+ false,
+ catalogConf,
+ "");
+ migrateFileAction.run();
+
tEnv.useCatalog("HIVE");
List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable01").collect());
+ List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable02").collect());
Assertions.assertThat(r1.size() == 0);
+ Assertions.assertThat(r2.size() == 0);
}
private String data(int i) {