This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b6cdf74e [hive] hive create table should force primary key not null
(#1837)
5b6cdf74e is described below
commit 5b6cdf74e97065ab56c9da57c36fa3f28d0ec9a5
Author: HZY <[email protected]>
AuthorDate: Mon Aug 21 14:35:10 2023 +0800
[hive] hive create table should force primary key not null (#1837)
---
.../main/java/org/apache/paimon/schema/Schema.java | 34 ++++++++++++++++++--
.../org/apache/paimon/schema/SchemaManager.java | 36 ----------------------
.../apache/paimon/flink/CatalogTableITCase.java | 6 ++--
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 22 +++++++++++--
4 files changed, 55 insertions(+), 43 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index 7f464b586..15a361aec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -18,6 +18,7 @@
package org.apache.paimon.schema;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -63,10 +64,11 @@ public class Schema {
List<String> primaryKeys,
Map<String, String> options,
String comment) {
- this.fields = normalizeFields(fields, primaryKeys, partitionKeys);
- this.partitionKeys = partitionKeys;
- this.primaryKeys = primaryKeys;
this.options = new HashMap<>(options);
+ this.partitionKeys = normalizePartitionKeys(partitionKeys);
+ this.primaryKeys = normalizePrimaryKeys(primaryKeys);
+ this.fields = normalizeFields(fields, this.primaryKeys,
this.partitionKeys);
+
this.comment = comment;
}
@@ -152,6 +154,32 @@ public class Schema {
return newFields;
}
+ private List<String> normalizePrimaryKeys(List<String> primaryKeys) {
+ if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
+ if (!primaryKeys.isEmpty()) {
+ throw new RuntimeException(
+ "Cannot define primary key on DDL and table options at
the same time.");
+ }
+ String pk = options.get(CoreOptions.PRIMARY_KEY.key());
+ primaryKeys = Arrays.asList(pk.split(","));
+ options.remove(CoreOptions.PRIMARY_KEY.key());
+ }
+ return primaryKeys;
+ }
+
+ private List<String> normalizePartitionKeys(List<String> partitionKeys) {
+ if (options.containsKey(CoreOptions.PARTITION.key())) {
+ if (!partitionKeys.isEmpty()) {
+ throw new RuntimeException(
+ "Cannot define partition on DDL and table options at
the same time.");
+ }
+ String partitions = options.get(CoreOptions.PARTITION.key());
+ partitionKeys = Arrays.asList(partitions.split(","));
+ options.remove(CoreOptions.PARTITION.key());
+ }
+ return partitionKeys;
+ }
+
private static Set<String> duplicate(List<String> names) {
return names.stream()
.filter(name -> Collections.frequency(names, name) > 1)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 739abf118..15c4e4b04 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -128,42 +128,6 @@ public class SchemaManager implements Serializable {
Map<String, String> options = schema.options();
int highestFieldId = RowType.currentHighestFieldId(fields);
- List<String> columnNames =
-
schema.fields().stream().map(DataField::name).collect(Collectors.toList());
- if (options.containsKey(CoreOptions.PRIMARY_KEY.key())) {
- if (!primaryKeys.isEmpty()) {
- throw new RuntimeException(
- "Cannot define primary key on DDL and table
options at the same time.");
- }
- String pk = options.get(CoreOptions.PRIMARY_KEY.key());
- primaryKeys = Arrays.asList(pk.split(","));
- boolean exists = columnNames.containsAll(primaryKeys);
- if (!exists) {
- throw new RuntimeException(
- String.format(
- "Primary key column '%s' is not defined in
the schema.",
- primaryKeys));
- }
- options.remove(CoreOptions.PRIMARY_KEY.key());
- }
-
- if (options.containsKey(CoreOptions.PARTITION.key())) {
- if (!partitionKeys.isEmpty()) {
- throw new RuntimeException(
- "Cannot define partition on DDL and table options
at the same time.");
- }
- String partitions = options.get(CoreOptions.PARTITION.key());
- partitionKeys = Arrays.asList(partitions.split(","));
- boolean exists = columnNames.containsAll(partitionKeys);
- if (!exists) {
- throw new RuntimeException(
- String.format(
- "Partition column '%s' is not defined in
the schema.",
- partitionKeys));
- }
- options.remove(CoreOptions.PARTITION.key());
- }
-
TableSchema newSchema =
new TableSchema(
0,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b9a148d4b..092c6ddba 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -331,7 +331,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
() ->
sql(
"CREATE TABLE t_pk_not_exist_as WITH
('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist"))
- .hasRootCauseMessage("Primary key column '[aaa]' is not
defined in the schema.");
+ .hasRootCauseMessage(
+ "Table column [user_id, item_id, behavior, dt, hh]
should include all primary key constraint [aaa]");
// primary key in option and DDL.
assertThatThrownBy(
@@ -362,7 +363,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
() ->
sql(
"CREATE TABLE t_partition_not_exist_as
WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist"))
- .hasRootCauseMessage("Partition column '[aaa]' is not defined
in the schema.");
+ .hasRootCauseMessage(
+ "Table column [user_id, item_id, behavior, dt, hh]
should include all partition fields [aaa]");
// partition in option and DDL.
assertThatThrownBy(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index ad5a318ff..51be412c7 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -342,6 +342,22 @@ public abstract class HiveCatalogITCaseBase {
assertThat(actual).contains(Row.of(1, "Apache"), Row.of(2, "Paimon"));
}
+ @Test
+ public void testHiveCreateAndFlinkInsertRead() throws Exception {
+ hiveShell.execute("SET hive.metastore.warehouse.dir=" + path);
+ hiveShell.execute(
+ "CREATE TABLE hive_test_table ( a INT, b STRING ) "
+ + "STORED BY '"
+ + PaimonStorageHandler.class.getName()
+ + "'"
+ + "TBLPROPERTIES ("
+ + " 'primary-key'='a'"
+ + ")");
+ tEnv.executeSql("INSERT INTO hive_test_table VALUES (1, 'Apache'), (2,
'Paimon')");
+ List<Row> actual = collect("SELECT * FROM hive_test_table");
+ assertThat(actual).contains(Row.of(1, "Apache"), Row.of(2, "Paimon"));
+ }
+
@Test
public void testCreateTableAs() throws Exception {
tEnv.executeSql("CREATE TABLE t (a INT)").await();
@@ -446,7 +462,8 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql(
"CREATE TABLE
t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist")
.await())
- .hasRootCauseMessage("Primary key column '[aaa]' is not
defined in the schema.");
+ .hasRootCauseMessage(
+ "Table column [user_id, item_id, behavior, dt, hh]
should include all primary key constraint [aaa]");
// primary key in option and DDL.
assertThatThrownBy(
@@ -480,7 +497,8 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql(
"CREATE TABLE
t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM
t_partition_not_exist")
.await())
- .hasRootCauseMessage("Partition column '[aaa]' is not defined
in the schema.");
+ .hasRootCauseMessage(
+ "Table column [user_id, item_id, behavior, dt, hh]
should include all partition fields [aaa]");
// partition in option and DDL.
assertThatThrownBy(