This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b6cdf74e [hive] hive create table should force primary key not null 
(#1837)
5b6cdf74e is described below

commit 5b6cdf74e97065ab56c9da57c36fa3f28d0ec9a5
Author: HZY <[email protected]>
AuthorDate: Mon Aug 21 14:35:10 2023 +0800

    [hive] hive create table should force primary key not null (#1837)
---
 .../main/java/org/apache/paimon/schema/Schema.java | 34 ++++++++++++++++++--
 .../org/apache/paimon/schema/SchemaManager.java    | 36 ----------------------
 .../apache/paimon/flink/CatalogTableITCase.java    |  6 ++--
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 22 +++++++++++--
 4 files changed, 55 insertions(+), 43 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 7f464b586..15a361aec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.schema;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -63,10 +64,11 @@ public class Schema {
             List<String> primaryKeys,
             Map<String, String> options,
             String comment) {
-        this.fields = normalizeFields(fields, primaryKeys, partitionKeys);
-        this.partitionKeys = partitionKeys;
-        this.primaryKeys = primaryKeys;
         this.options = new HashMap<>(options);
+        this.partitionKeys = normalizePartitionKeys(partitionKeys);
+        this.primaryKeys = normalizePrimaryKeys(primaryKeys);
+        this.fields = normalizeFields(fields, this.primaryKeys, 
this.partitionKeys);
+
         this.comment = comment;
     }
 
@@ -152,6 +154,32 @@ public class Schema {
         return newFields;
     }
 
+    private List<String> normalizePrimaryKeys(List<String> primaryKeys) {
+        if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+            if (!primaryKeys.isEmpty()) {
+                throw new RuntimeException(
+                        "Cannot define primary key on DDL and table options at 
the same time.");
+            }
+            String pk = options.get(CoreOptions.PRIMARY_KEY.key());
+            primaryKeys = Arrays.asList(pk.split(","));
+            options.remove(CoreOptions.PRIMARY_KEY.key());
+        }
+        return primaryKeys;
+    }
+
+    private List<String> normalizePartitionKeys(List<String> partitionKeys) {
+        if (options.containsKey(CoreOptions.PARTITION.key())) {
+            if (!partitionKeys.isEmpty()) {
+                throw new RuntimeException(
+                        "Cannot define partition on DDL and table options at 
the same time.");
+            }
+            String partitions = options.get(CoreOptions.PARTITION.key());
+            partitionKeys = Arrays.asList(partitions.split(","));
+            options.remove(CoreOptions.PARTITION.key());
+        }
+        return partitionKeys;
+    }
+
     private static Set<String> duplicate(List<String> names) {
         return names.stream()
                 .filter(name -> Collections.frequency(names, name) > 1)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 739abf118..15c4e4b04 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -128,42 +128,6 @@ public class SchemaManager implements Serializable {
             Map<String, String> options = schema.options();
             int highestFieldId = RowType.currentHighestFieldId(fields);
 
-            List<String> columnNames =
-                    
schema.fields().stream().map(DataField::name).collect(Collectors.toList());
-            if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
-                if (!primaryKeys.isEmpty()) {
-                    throw new RuntimeException(
-                            "Cannot define primary key on DDL and table 
options at the same time.");
-                }
-                String pk = options.get(CoreOptions.PRIMARY_KEY.key());
-                primaryKeys = Arrays.asList(pk.split(","));
-                boolean exists = columnNames.containsAll(primaryKeys);
-                if (!exists) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Primary key column '%s' is not defined in 
the schema.",
-                                    primaryKeys));
-                }
-                options.remove(CoreOptions.PRIMARY_KEY.key());
-            }
-
-            if (options.containsKey(CoreOptions.PARTITION.key())) {
-                if (!partitionKeys.isEmpty()) {
-                    throw new RuntimeException(
-                            "Cannot define partition on DDL and table options 
at the same time.");
-                }
-                String partitions = options.get(CoreOptions.PARTITION.key());
-                partitionKeys = Arrays.asList(partitions.split(","));
-                boolean exists = columnNames.containsAll(partitionKeys);
-                if (!exists) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "Partition column '%s' is not defined in 
the schema.",
-                                    partitionKeys));
-                }
-                options.remove(CoreOptions.PARTITION.key());
-            }
-
             TableSchema newSchema =
                     new TableSchema(
                             0,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b9a148d4b..092c6ddba 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -331,7 +331,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         () ->
                                 sql(
                                         "CREATE TABLE t_pk_not_exist_as WITH 
('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist"))
-                .hasRootCauseMessage("Primary key column '[aaa]' is not 
defined in the schema.");
+                .hasRootCauseMessage(
+                        "Table column [user_id, item_id, behavior, dt, hh] 
should include all primary key constraint [aaa]");
 
         // primary key in option and DDL.
         assertThatThrownBy(
@@ -362,7 +363,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         () ->
                                 sql(
                                         "CREATE TABLE t_partition_not_exist_as 
WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist"))
-                .hasRootCauseMessage("Partition column '[aaa]' is not defined 
in the schema.");
+                .hasRootCauseMessage(
+                        "Table column [user_id, item_id, behavior, dt, hh] 
should include all partition fields [aaa]");
 
         // partition in option and DDL.
         assertThatThrownBy(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index ad5a318ff..51be412c7 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -342,6 +342,22 @@ public abstract class HiveCatalogITCaseBase {
         assertThat(actual).contains(Row.of(1, "Apache"), Row.of(2, "Paimon"));
     }
 
+    @Test
+    public void testHiveCreateAndFlinkInsertRead() throws Exception {
+        hiveShell.execute("SET hive.metastore.warehouse.dir=" + path);
+        hiveShell.execute(
+                "CREATE TABLE hive_test_table ( a INT, b STRING ) "
+                        + "STORED BY '"
+                        + PaimonStorageHandler.class.getName()
+                        + "'"
+                        + "TBLPROPERTIES ("
+                        + "  'primary-key'='a'"
+                        + ")");
+        tEnv.executeSql("INSERT INTO hive_test_table VALUES (1, 'Apache'), (2, 
'Paimon')");
+        List<Row> actual = collect("SELECT * FROM hive_test_table");
+        assertThat(actual).contains(Row.of(1, "Apache"), Row.of(2, "Paimon"));
+    }
+
     @Test
     public void testCreateTableAs() throws Exception {
         tEnv.executeSql("CREATE TABLE t (a INT)").await();
@@ -446,7 +462,8 @@ public abstract class HiveCatalogITCaseBase {
                                 tEnv.executeSql(
                                                 "CREATE TABLE 
t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist")
                                         .await())
-                .hasRootCauseMessage("Primary key column '[aaa]' is not 
defined in the schema.");
+                .hasRootCauseMessage(
+                        "Table column [user_id, item_id, behavior, dt, hh] 
should include all primary key constraint [aaa]");
 
         // primary key in option and DDL.
         assertThatThrownBy(
@@ -480,7 +497,8 @@ public abstract class HiveCatalogITCaseBase {
                                 tEnv.executeSql(
                                                 "CREATE TABLE 
t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM 
t_partition_not_exist")
                                         .await())
-                .hasRootCauseMessage("Partition column '[aaa]' is not defined 
in the schema.");
+                .hasRootCauseMessage(
+                        "Table column [user_id, item_id, behavior, dt, hh] 
should include all partition fields [aaa]");
 
         // partition in option and DDL.
         assertThatThrownBy(

Reply via email to