This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0160783c1 [core] Add validation for sequence group during create table 
(#2204)
0160783c1 is described below

commit 0160783c1bb3db53b1252c551cd1419b129fb662
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 31 10:15:33 2023 +0800

    [core] Add validation for sequence group during create table (#2204)
---
 .../org/apache/paimon/schema/SchemaValidation.java | 41 ++++++++++++++++++++++
 .../apache/paimon/flink/PartialUpdateITCase.java   | 39 ++++++++++++++++++++
 2 files changed, 80 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ed3f28e57..5a34d1187 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -37,13 +37,16 @@ import org.apache.paimon.types.VarCharType;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
@@ -54,6 +57,7 @@ import static 
org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
+import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
 import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -81,6 +85,8 @@ public class SchemaValidation {
 
         validateStartupMode(options);
 
+        validateSequenceGroup(schema, options);
+
         ChangelogProducer changelogProducer = options.changelogProducer();
         if (schema.primaryKeys().isEmpty() && changelogProducer != 
ChangelogProducer.NONE) {
             throw new UnsupportedOperationException(
@@ -308,6 +314,41 @@ public class SchemaValidation {
         return 
configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
     }
 
+    private static void validateSequenceGroup(TableSchema schema, CoreOptions 
options) {
+        Map<String, Set<String>> fields2Group = new HashMap<>();
+        for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
+            String k = entry.getKey();
+            String v = entry.getValue();
+            List<String> fieldNames = schema.fieldNames();
+            if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
+                String sequenceFieldName =
+                        k.substring(
+                                FIELDS_PREFIX.length() + 1,
+                                k.length() - SEQUENCE_GROUP.length() - 1);
+                if (!fieldNames.contains(sequenceFieldName)) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "The sequence field group: %s can not be 
found in table schema.",
+                                    sequenceFieldName));
+                }
+
+                for (String field : v.split(",")) {
+                    if (!fieldNames.contains(field)) {
+                        throw new IllegalArgumentException(
+                                String.format("Field %s can not be found in 
table schema.", field));
+                    }
+                    Set<String> group = fields2Group.computeIfAbsent(field, p 
-> new HashSet<>());
+                    if (group.add(k) && group.size() > 1) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Field %s is defined repeatedly by 
multiple groups: %s.",
+                                        field, group));
+                    }
+                }
+            }
+        }
+    }
+
     private static void validateDefaultValues(TableSchema schema) {
         CoreOptions coreOptions = new CoreOptions(schema.options());
         Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 887183518..96796ff07 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
@@ -201,4 +202,42 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         assertThat(sql("SELECT a, b FROM 
SG")).containsExactlyInAnyOrder(Row.of(4, 4));
         assertThat(sql("SELECT c, d FROM 
SG")).containsExactlyInAnyOrder(Row.of(5, null));
     }
+
+    @Test
+    public void testInvalidSequenceGroup() {
+        Assertions.assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE SG ("
+                                                + "k INT, a INT, b INT, g_1 
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                                                + " WITH ("
+                                                + 
"'merge-engine'='partial-update', "
+                                                + 
"'fields.g_0.sequence-group'='a,b', "
+                                                + 
"'fields.g_2.sequence-group'='c,d');"))
+                .hasRootCauseMessage(
+                        "The sequence field group: g_0 can not be found in 
table schema.");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE SG ("
+                                                + "k INT, a INT, b INT, g_1 
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                                                + " WITH ("
+                                                + 
"'merge-engine'='partial-update', "
+                                                + 
"'fields.g_1.sequence-group'='a1,b', "
+                                                + 
"'fields.g_2.sequence-group'='c,d');"))
+                .hasRootCauseMessage("Field a1 can not be found in table 
schema.");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE SG ("
+                                                + "k INT, a INT, b INT, g_1 
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                                                + " WITH ("
+                                                + 
"'merge-engine'='partial-update', "
+                                                + 
"'fields.g_1.sequence-group'='a,b', "
+                                                + 
"'fields.g_2.sequence-group'='a,d');"))
+                .hasRootCauseMessage(
+                        "Field a is defined repeatedly by multiple groups: 
[fields.g_1.sequence-group, fields.g_2.sequence-group].");
+    }
 }

Reply via email to