This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19cf86d275 [hive][clone] check compatibility if paimon table already
exists (#5506)
19cf86d275 is described below
commit 19cf86d275125118552d2b95b7d28437de848520
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Apr 22 18:41:17 2025 +0800
[hive][clone] check compatibility if paimon table already exists (#5506)
---
.../flink/clone/hive/CopyProcessFunction.java | 4 +
.../flink/clone/hive/ListHiveFilesFunction.java | 57 ++++++++++++-
.../hive/procedure/CloneHiveActionITCase.java | 97 ++++++++++++++++++++++
3 files changed, 157 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
index f1f381aaed..76834c8857 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
@@ -28,6 +28,8 @@ import org.apache.paimon.table.Table;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -38,6 +40,8 @@ import static
org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatal
/** Abstract function for copying tables. */
public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
+ protected static final Logger LOG =
LoggerFactory.getLogger(CopyProcessFunction.class);
+
protected final Map<String, String> sourceCatalogConfig;
protected final Map<String, String> targetCatalogConfig;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
index f658a2e27d..a49147e826 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.clone.hive;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.hive.migrate.HiveCloneUtils;
@@ -29,6 +30,8 @@ import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,10 +40,12 @@ import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** List files for table. */
public class ListHiveFilesFunction
@@ -78,7 +83,23 @@ public class ListHiveFilesFunction
schema.primaryKeys(),
options,
schema.comment());
- targetCatalog.createTable(tuple.f1, schema, false);
+ try {
+ Table existedTable = targetCatalog.getTable(tuple.f1);
+
+ checkState(
+ existedTable instanceof FileStoreTable,
+ String.format(
+ "existed paimon table '%s' is not a
FileStoreTable, but a %s",
+ tuple.f1, existedTable.getClass().getName()));
+ checkCompatible(schema, (FileStoreTable) existedTable);
+
+ LOG.info("paimon table '{}' already exists, use it as target
table.", tuple.f1);
+ } catch (Catalog.TableNotExistException e) {
+ LOG.info("create target paimon table '{}'.", tuple.f1);
+
+ targetCatalog.createTable(tuple.f1, schema, false);
+ }
+
FileStoreTable table = (FileStoreTable)
targetCatalog.getTable(tuple.f1);
PartitionPredicate predicate =
getPartitionPredicate(whereSql,
table.schema().logicalPartitionType(), tuple.f0);
@@ -95,6 +116,40 @@ public class ListHiveFilesFunction
}
}
+ private void checkCompatible(Schema sourceSchema, FileStoreTable
existedTable) {
+ Schema existedSchema = existedTable.schema().toSchema();
+
+ // check primary keys
+ checkState(
+ existedSchema.primaryKeys().isEmpty(),
+ "Can not clone data to existed paimon table which has primary
keys. Existed paimon table is "
+ + existedTable.name());
+
+ // check bucket
+ checkState(
+ existedTable.coreOptions().bucket() == -1,
+ "Can not clone data to existed paimon table which bucket is
not -1. Existed paimon table is "
+ + existedTable.name());
+
+ // check partition keys
+ List<String> sourcePartitionFields = sourceSchema.partitionKeys();
+ List<String> existedPartitionFields = existedSchema.partitionKeys();
+
+ checkState(
+ sourcePartitionFields.size() == existedPartitionFields.size()
+ && new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+ "source table partition keys is not compatible with existed
paimon table partition keys.");
+
+ // check all fields
+ List<DataField> sourceFields = sourceSchema.fields();
+ List<DataField> existedFields = existedSchema.fields();
+
+ checkState(
+ existedFields.size() >= sourceFields.size()
+ && new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+ "source table partition keys is not compatible with existed
paimon table partition keys.");
+ }
+
@VisibleForTesting
@Nullable
static PartitionPredicate getPartitionPredicate(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
index 5296c3b0fb..65983082ca 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
/** Tests for {@link CloneHiveAction}. */
public class CloneHiveActionITCase extends ActionITCaseBase {
@@ -342,6 +344,101 @@ public class CloneHiveActionITCase extends
ActionITCaseBase {
Assertions.assertThatList(actualR2).containsExactlyInAnyOrderElementsOf(r2);
}
+ @Test
+ public void testCloneWithExistedTable() throws Exception {
+ String format = "avro";
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(
+ "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int,
id3 int) STORED AS "
+ + format);
+ tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ String query = "SELECT * FROM hivetable";
+ List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql(query).collect());
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+ // create a paimon table with the same name
+ // int ddlIndex = ThreadLocalRandom.current().nextInt(0, 4);
+ int ddlIndex = 3;
+ tEnv.executeSql(ddls()[ddlIndex]);
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "clone_hive",
+ "--database",
+ "default",
+ "--table",
+ "hivetable",
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse));
+
+ if (ddlIndex < 3) {
+ assertThatThrownBy(() -> createAction(CloneHiveAction.class,
args).run())
+ .rootCause()
+ .hasMessageContaining(exceptionMsg()[ddlIndex]);
+ } else {
+ createAction(CloneHiveAction.class, args).run();
+ FileStoreTable paimonTable =
+ paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+
+
Assertions.assertThat(paimonTable.partitionKeys()).containsExactly("id2",
"id3");
+
+ List<Row> r2 =
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM
test.test_table").collect());
+
+
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+ }
+
+ private String[] ddls() {
+ // has primary key
+ String ddl0 =
+ "CREATE TABLE test.test_table (id string, id2 int, id3 int,
PRIMARY KEY (id, id2, id3) NOT ENFORCED) "
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+ // has different partition keys
+ String ddl1 =
+ "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
+ + "PARTITIONED BY (id, id3) with ('bucket' = '-1');";
+ // size of fields is different
+ String ddl2 =
+ "CREATE TABLE test.test_table (id2 int, id3 int) "
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+ // normal
+ String ddl3 =
+ "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+ return new String[] {ddl0, ddl1, ddl2, ddl3};
+ }
+
+ private String[] exceptionMsg() {
+ return new String[] {
+ "Can not clone data to existed paimon table which has primary
keys",
+ "source table partition keys is not compatible with existed paimon
table partition keys.",
+ "source table partition keys is not compatible with existed paimon
table partition keys."
+ };
+ }
+
private static String data(int i) {
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder();