This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c0f59222 [core] Add more validation for sequence field during create 
table. (#3022)
5c0f59222 is described below

commit 5c0f592229fa2ecd111150f9893d1bba31d3b348
Author: sai <[email protected]>
AuthorDate: Mon Mar 18 10:24:45 2024 +0800

    [core] Add more validation for sequence field during create table. (#3022)
---
 .../main/java/org/apache/paimon/CoreOptions.java   | 36 --------
 .../org/apache/paimon/schema/SchemaValidation.java | 98 ++++++++++++----------
 .../apache/paimon/schema/SchemaManagerTest.java    |  2 +-
 .../java/org/apache/paimon/schema/SchemaUtils.java |  7 --
 .../org/apache/paimon/schema/TableSchemaTest.java  | 43 ++++++++++
 5 files changed, 98 insertions(+), 88 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b5b7ea432..19554296c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -28,7 +28,6 @@ import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.options.OptionsUtils;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
 import org.apache.paimon.options.description.InlineElement;
@@ -2103,41 +2102,6 @@ public class CoreOptions implements Serializable {
         }
     }
 
-    /** Specifies the way of making up time precision for sequence field. */
-    public enum SequenceAutoPadding implements DescribedEnum {
-        ROW_KIND_FLAG(
-                "row-kind-flag",
-                "Pads a bit flag to indicate whether it is retract (0) or add 
(1) message."),
-        SECOND_TO_MICRO(
-                "second-to-micro",
-                "Pads the sequence field that indicates time with precision of 
seconds to micro-second."),
-        MILLIS_TO_MICRO(
-                "millis-to-micro",
-                "Pads the sequence field that indicates time with precision of 
milli-second to micro-second.");
-
-        private final String value;
-        private final String description;
-
-        SequenceAutoPadding(String value, String description) {
-            this.value = value;
-            this.description = description;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-
-        @Override
-        public InlineElement getDescription() {
-            return text(description);
-        }
-
-        public static SequenceAutoPadding fromString(String s) {
-            return OptionsUtils.convertToEnum(s, SequenceAutoPadding.class);
-        }
-    }
-
     /** The mode for tag creation. */
     public enum TagCreationMode implements DescribedEnum {
         NONE("none", "No automatically created tags."),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 5cd00d8c6..cbdcb64e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -42,7 +42,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -91,6 +90,8 @@ public class SchemaValidation {
 
         validateStartupMode(options);
 
+        validateSequenceField(schema, options);
+
         validateSequenceGroup(schema, options);
 
         ChangelogProducer changelogProducer = options.changelogProducer();
@@ -167,44 +168,6 @@ public class SchemaValidation {
             }
         }
 
-        List<String> sequenceField = options.sequenceField();
-        if (sequenceField.size() > 0) {
-            checkArgument(
-                    schema.fieldNames().containsAll(sequenceField),
-                    "Nonexistent sequence fields: '%s'",
-                    sequenceField);
-        }
-
-        Optional<String> rowkindField = options.rowkindField();
-        rowkindField.ifPresent(
-                field ->
-                        checkArgument(
-                                schema.fieldNames().contains(field),
-                                "Nonexistent rowkind field: '%s'",
-                                field));
-
-        if (sequenceField.size() > 0) {
-            sequenceField.forEach(
-                    field ->
-                            checkArgument(
-                                    options.fieldAggFunc(field) == null,
-                                    "Should not define aggregation on sequence 
field: '%s'",
-                                    field));
-        }
-
-        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
-            if (sequenceField.size() > 0) {
-                throw new IllegalArgumentException(
-                        "Do not support use sequence field on FIRST_MERGE 
merge engine");
-            }
-
-            if (changelogProducer != ChangelogProducer.LOOKUP) {
-                throw new IllegalArgumentException(
-                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
-            }
-        }
-
         if (schema.crossPartitionUpdate()) {
             if (options.bucket() != -1) {
                 throw new IllegalArgumentException(
@@ -213,16 +176,23 @@ public class SchemaValidation {
                                         + "(Primary key constraint %s not 
include all partition fields %s).",
                                 schema.primaryKeys(), schema.partitionKeys()));
             }
+        }
 
-            if (sequenceField.size() > 0) {
+        if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+            if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
                 throw new IllegalArgumentException(
-                        String.format(
-                                "You can not use sequence.field in cross 
partition update case "
-                                        + "(Primary key constraint %s not 
include all partition fields %s).",
-                                schema.primaryKeys(), schema.partitionKeys()));
+                        "Only support 'lookup' changelog-producer on 
FIRST_MERGE merge engine");
             }
         }
 
+        options.rowkindField()
+                .ifPresent(
+                        field ->
+                                checkArgument(
+                                        schema.fieldNames().contains(field),
+                                        "Rowkind field: '%s' can not be found 
in table schema.",
+                                        field));
+
         if (options.deletionVectorsEnabled()) {
             validateForDeletionVectors(schema, options);
         }
@@ -498,4 +468,44 @@ public class SchemaValidation {
                 !options.mergeEngine().equals(MergeEngine.FIRST_ROW),
                 "Deletion vectors mode is not supported for first row merge 
engine now.");
     }
+
+    private static void validateSequenceField(TableSchema schema, CoreOptions 
options) {
+        List<String> sequenceField = options.sequenceField();
+        if (sequenceField.size() > 0) {
+            Map<String, Integer> fieldCount =
+                    sequenceField.stream()
+                            .collect(Collectors.toMap(field -> field, field -> 
1, Integer::sum));
+
+            sequenceField.forEach(
+                    field -> {
+                        checkArgument(
+                                schema.fieldNames().contains(field),
+                                "Sequence field: '%s' can not be found in 
table schema.",
+                                field);
+
+                        checkArgument(
+                                options.fieldAggFunc(field) == null,
+                                "Should not define aggregation on sequence 
field: '%s'.",
+                                field);
+
+                        checkArgument(
+                                fieldCount.get(field) == 1,
+                                "Sequence field '%s' is defined repeatedly.",
+                                field);
+                    });
+
+            if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+                throw new IllegalArgumentException(
+                        "Do not support use sequence field on FIRST_MERGE 
merge engine.");
+            }
+
+            if (schema.crossPartitionUpdate()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "You can not use sequence.field in cross 
partition update case "
+                                        + "(Primary key constraint '%s' not 
include all partition fields '%s').",
+                                schema.primaryKeys(), schema.partitionKeys()));
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index db37284be..b5ee8c639 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -131,7 +131,7 @@ public class SchemaManagerTest {
                                                                         "f4"),
                                                                 ""))))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("Nonexistent sequence fields: '[f4]'");
+                .hasMessageContaining("Sequence field: 'f4' can not be found 
in table schema.");
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
index 435c3263b..67010e586 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.schema;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
@@ -69,12 +68,6 @@ public class SchemaUtils {
                 id = 0;
             }
 
-            String sequenceField = 
options.get(CoreOptions.SEQUENCE_FIELD.key());
-            Preconditions.checkArgument(
-                    sequenceField == null || 
rowType.getFieldNames().contains(sequenceField),
-                    "Nonexistent sequence field: '%s'",
-                    sequenceField);
-
             TableSchema newSchema =
                     new TableSchema(
                             id,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index d735ea819..a39ce083b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.schema;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -30,7 +31,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
 import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -118,6 +122,45 @@ public class TableSchemaTest {
                 .hasMessage("Broken schema, field id 0 is duplicated.");
     }
 
+    @Test
+    public void testSequenceField() {
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", DataTypes.INT()),
+                        new DataField(1, "f1", DataTypes.INT()),
+                        new DataField(2, "f2", DataTypes.INT()),
+                        new DataField(3, "f3", DataTypes.INT()));
+        List<String> partitionKeys = Collections.singletonList("f0");
+        List<String> primaryKeys = Collections.singletonList("f1");
+        Map<String, String> options = new HashMap<>();
+
+        TableSchema schema =
+                new TableSchema(1, fields, 10, partitionKeys, primaryKeys, 
options, "");
+
+        options.put(SEQUENCE_FIELD.key(), "f3");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining(
+                        "You can not use sequence.field in cross partition 
update case (Primary key constraint '[f1]' not include all partition fields 
'[f0]').");
+
+        options.put(SEQUENCE_FIELD.key(), "f4");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("Sequence field: 'f4' can not be found 
in table schema.");
+
+        options.put(SEQUENCE_FIELD.key(), "f2,f3,f3");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("Sequence field 'f3' is defined 
repeatedly.");
+
+        options.put(SEQUENCE_FIELD.key(), "f3");
+        options.put(MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.FIRST_ROW.toString());
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining(
+                        "Do not support use sequence field on FIRST_MERGE 
merge engine.");
+
+        options.put(FIELDS_PREFIX + ".f3." + AGG_FUNCTION, "max");
+        assertThatThrownBy(() -> validateTableSchema(schema))
+                .hasMessageContaining("Should not define aggregation on 
sequence field: 'f3'.");
+    }
+
     static RowType newRowType(boolean isNullable, int fieldId) {
         return new RowType(
                 isNullable,

Reply via email to