This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19cf86d275 [hive][clone] check compatibility if paimon table already 
exists (#5506)
19cf86d275 is described below

commit 19cf86d275125118552d2b95b7d28437de848520
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Apr 22 18:41:17 2025 +0800

    [hive][clone] check compatibility if paimon table already exists (#5506)
---
 .../flink/clone/hive/CopyProcessFunction.java      |  4 +
 .../flink/clone/hive/ListHiveFilesFunction.java    | 57 ++++++++++++-
 .../hive/procedure/CloneHiveActionITCase.java      | 97 ++++++++++++++++++++++
 3 files changed, 157 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
index f1f381aaed..76834c8857 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
@@ -28,6 +28,8 @@ import org.apache.paimon.table.Table;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,6 +40,8 @@ import static 
org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatal
 /** Abstract function for copying tables. */
 public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
 
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CopyProcessFunction.class);
+
     protected final Map<String, String> sourceCatalogConfig;
     protected final Map<String, String> targetCatalogConfig;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
index f658a2e27d..a49147e826 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.clone.hive;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
 import org.apache.paimon.hive.migrate.HiveCloneUtils;
@@ -29,6 +30,8 @@ import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,10 +40,12 @@ import org.apache.flink.util.Collector;
 
 import javax.annotation.Nullable;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** List files for table. */
 public class ListHiveFilesFunction
@@ -78,7 +83,23 @@ public class ListHiveFilesFunction
                         schema.primaryKeys(),
                         options,
                         schema.comment());
-        targetCatalog.createTable(tuple.f1, schema, false);
+        try {
+            Table existedTable = targetCatalog.getTable(tuple.f1);
+
+            checkState(
+                    existedTable instanceof FileStoreTable,
+                    String.format(
+                            "existed paimon table '%s' is not a 
FileStoreTable, but a %s",
+                            tuple.f1, existedTable.getClass().getName()));
+            checkCompatible(schema, (FileStoreTable) existedTable);
+
+            LOG.info("paimon table '{}' already exists, use it as target 
table.", tuple.f1);
+        } catch (Catalog.TableNotExistException e) {
+            LOG.info("create target paimon table '{}'.", tuple.f1);
+
+            targetCatalog.createTable(tuple.f1, schema, false);
+        }
+
         FileStoreTable table = (FileStoreTable) 
targetCatalog.getTable(tuple.f1);
         PartitionPredicate predicate =
                 getPartitionPredicate(whereSql, 
table.schema().logicalPartitionType(), tuple.f0);
@@ -95,6 +116,40 @@ public class ListHiveFilesFunction
         }
     }
 
+    private void checkCompatible(Schema sourceSchema, FileStoreTable 
existedTable) {
+        Schema existedSchema = existedTable.schema().toSchema();
+
+        // check primary keys
+        checkState(
+                existedSchema.primaryKeys().isEmpty(),
+                "Can not clone data to existed paimon table which has primary 
keys. Existed paimon table is "
+                        + existedTable.name());
+
+        // check bucket
+        checkState(
+                existedTable.coreOptions().bucket() == -1,
+                "Can not clone data to existed paimon table which bucket is 
not -1. Existed paimon table is "
+                        + existedTable.name());
+
+        // check partition keys
+        List<String> sourcePartitionFields = sourceSchema.partitionKeys();
+        List<String> existedPartitionFields = existedSchema.partitionKeys();
+
+        checkState(
+                sourcePartitionFields.size() == existedPartitionFields.size()
+                        && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+                "source table partition keys is not compatible with existed 
paimon table partition keys.");
+
+        // check all fields
+        List<DataField> sourceFields = sourceSchema.fields();
+        List<DataField> existedFields = existedSchema.fields();
+
+        checkState(
+                existedFields.size() >= sourceFields.size()
+                        && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+                "source table partition keys is not compatible with existed 
paimon table partition keys.");
+    }
+
     @VisibleForTesting
     @Nullable
     static PartitionPredicate getPartitionPredicate(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
index 5296c3b0fb..65983082ca 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
@@ -45,6 +45,8 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
 /** Tests for {@link CloneHiveAction}. */
 public class CloneHiveActionITCase extends ActionITCaseBase {
 
@@ -342,6 +344,101 @@ public class CloneHiveActionITCase extends 
ActionITCaseBase {
         
Assertions.assertThatList(actualR2).containsExactlyInAnyOrderElementsOf(r2);
     }
 
+    @Test
+    public void testCloneWithExistedTable() throws Exception {
+        String format = "avro";
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql(
+                "CREATE TABLE hivetable (id string) PARTITIONED BY (id2 int, 
id3 int) STORED AS "
+                        + format);
+        tEnv.executeSql("INSERT INTO hivetable VALUES" + data(100)).await();
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        String query = "SELECT * FROM hivetable";
+        List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql(query).collect());
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+        // create a paimon table with the same name
+        //        int ddlIndex = ThreadLocalRandom.current().nextInt(0, 4);
+        int ddlIndex = 3;
+        tEnv.executeSql(ddls()[ddlIndex]);
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "clone_hive",
+                                "--database",
+                                "default",
+                                "--table",
+                                "hivetable",
+                                "--catalog_conf",
+                                "metastore=hive",
+                                "--catalog_conf",
+                                "uri=thrift://localhost:" + PORT,
+                                "--target_database",
+                                "test",
+                                "--target_table",
+                                "test_table",
+                                "--target_catalog_conf",
+                                "warehouse=" + warehouse));
+
+        if (ddlIndex < 3) {
+            assertThatThrownBy(() -> createAction(CloneHiveAction.class, 
args).run())
+                    .rootCause()
+                    .hasMessageContaining(exceptionMsg()[ddlIndex]);
+        } else {
+            createAction(CloneHiveAction.class, args).run();
+            FileStoreTable paimonTable =
+                    paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+
+            
Assertions.assertThat(paimonTable.partitionKeys()).containsExactly("id2", 
"id3");
+
+            List<Row> r2 =
+                    ImmutableList.copyOf(
+                            tEnv.executeSql("SELECT * FROM 
test.test_table").collect());
+
+            
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+        }
+    }
+
+    private String[] ddls() {
+        // has primary key
+        String ddl0 =
+                "CREATE TABLE test.test_table (id string, id2 int, id3 int, 
PRIMARY KEY (id, id2, id3) NOT ENFORCED) "
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+        // has different partition keys
+        String ddl1 =
+                "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
+                        + "PARTITIONED BY (id, id3) with ('bucket' = '-1');";
+        // size of fields is different
+        String ddl2 =
+                "CREATE TABLE test.test_table (id2 int, id3 int) "
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+        // normal
+        String ddl3 =
+                "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+        return new String[] {ddl0, ddl1, ddl2, ddl3};
+    }
+
+    private String[] exceptionMsg() {
+        return new String[] {
+            "Can not clone data to existed paimon table which has primary 
keys",
+            "source table partition keys is not compatible with existed paimon 
table partition keys.",
+            "source table partition keys is not compatible with existed paimon 
table partition keys."
+        };
+    }
+
     private static String data(int i) {
         Random random = new Random();
         StringBuilder stringBuilder = new StringBuilder();

Reply via email to