This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c0f59222 [core] Add more validation for sequence field during create
table. (#3022)
5c0f59222 is described below
commit 5c0f592229fa2ecd111150f9893d1bba31d3b348
Author: sai <[email protected]>
AuthorDate: Mon Mar 18 10:24:45 2024 +0800
[core] Add more validation for sequence field during create table. (#3022)
---
.../main/java/org/apache/paimon/CoreOptions.java | 36 --------
.../org/apache/paimon/schema/SchemaValidation.java | 98 ++++++++++++----------
.../apache/paimon/schema/SchemaManagerTest.java | 2 +-
.../java/org/apache/paimon/schema/SchemaUtils.java | 7 --
.../org/apache/paimon/schema/TableSchemaTest.java | 43 ++++++++++
5 files changed, 98 insertions(+), 88 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index b5b7ea432..19554296c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -28,7 +28,6 @@ import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
-import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
@@ -2103,41 +2102,6 @@ public class CoreOptions implements Serializable {
}
}
- /** Specifies the way of making up time precision for sequence field. */
- public enum SequenceAutoPadding implements DescribedEnum {
- ROW_KIND_FLAG(
- "row-kind-flag",
- "Pads a bit flag to indicate whether it is retract (0) or add
(1) message."),
- SECOND_TO_MICRO(
- "second-to-micro",
- "Pads the sequence field that indicates time with precision of
seconds to micro-second."),
- MILLIS_TO_MICRO(
- "millis-to-micro",
- "Pads the sequence field that indicates time with precision of
milli-second to micro-second.");
-
- private final String value;
- private final String description;
-
- SequenceAutoPadding(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
-
- public static SequenceAutoPadding fromString(String s) {
- return OptionsUtils.convertToEnum(s, SequenceAutoPadding.class);
- }
- }
-
/** The mode for tag creation. */
public enum TagCreationMode implements DescribedEnum {
NONE("none", "No automatically created tags."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 5cd00d8c6..cbdcb64e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -42,7 +42,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -91,6 +90,8 @@ public class SchemaValidation {
validateStartupMode(options);
+ validateSequenceField(schema, options);
+
validateSequenceGroup(schema, options);
ChangelogProducer changelogProducer = options.changelogProducer();
@@ -167,44 +168,6 @@ public class SchemaValidation {
}
}
- List<String> sequenceField = options.sequenceField();
- if (sequenceField.size() > 0) {
- checkArgument(
- schema.fieldNames().containsAll(sequenceField),
- "Nonexistent sequence fields: '%s'",
- sequenceField);
- }
-
- Optional<String> rowkindField = options.rowkindField();
- rowkindField.ifPresent(
- field ->
- checkArgument(
- schema.fieldNames().contains(field),
- "Nonexistent rowkind field: '%s'",
- field));
-
- if (sequenceField.size() > 0) {
- sequenceField.forEach(
- field ->
- checkArgument(
- options.fieldAggFunc(field) == null,
- "Should not define aggregation on sequence
field: '%s'",
- field));
- }
-
- CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
- if (sequenceField.size() > 0) {
- throw new IllegalArgumentException(
- "Do not support use sequence field on FIRST_MERGE
merge engine");
- }
-
- if (changelogProducer != ChangelogProducer.LOOKUP) {
- throw new IllegalArgumentException(
- "Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
- }
- }
-
if (schema.crossPartitionUpdate()) {
if (options.bucket() != -1) {
throw new IllegalArgumentException(
@@ -213,16 +176,23 @@ public class SchemaValidation {
+ "(Primary key constraint %s not
include all partition fields %s).",
schema.primaryKeys(), schema.partitionKeys()));
}
+ }
- if (sequenceField.size() > 0) {
+ if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+ if (options.changelogProducer() != ChangelogProducer.LOOKUP) {
throw new IllegalArgumentException(
- String.format(
- "You can not use sequence.field in cross
partition update case "
- + "(Primary key constraint %s not
include all partition fields %s).",
- schema.primaryKeys(), schema.partitionKeys()));
+ "Only support 'lookup' changelog-producer on
FIRST_MERGE merge engine");
}
}
+ options.rowkindField()
+ .ifPresent(
+ field ->
+ checkArgument(
+ schema.fieldNames().contains(field),
+ "Rowkind field: '%s' can not be found
in table schema.",
+ field));
+
if (options.deletionVectorsEnabled()) {
validateForDeletionVectors(schema, options);
}
@@ -498,4 +468,44 @@ public class SchemaValidation {
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
"Deletion vectors mode is not supported for first row merge
engine now.");
}
+
+ private static void validateSequenceField(TableSchema schema, CoreOptions
options) {
+ List<String> sequenceField = options.sequenceField();
+ if (sequenceField.size() > 0) {
+ Map<String, Integer> fieldCount =
+ sequenceField.stream()
+ .collect(Collectors.toMap(field -> field, field ->
1, Integer::sum));
+
+ sequenceField.forEach(
+ field -> {
+ checkArgument(
+ schema.fieldNames().contains(field),
+ "Sequence field: '%s' can not be found in
table schema.",
+ field);
+
+ checkArgument(
+ options.fieldAggFunc(field) == null,
+ "Should not define aggregation on sequence
field: '%s'.",
+ field);
+
+ checkArgument(
+ fieldCount.get(field) == 1,
+ "Sequence field '%s' is defined repeatedly.",
+ field);
+ });
+
+ if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
+ throw new IllegalArgumentException(
+ "Do not support use sequence field on FIRST_MERGE
merge engine.");
+ }
+
+ if (schema.crossPartitionUpdate()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You can not use sequence.field in cross
partition update case "
+ + "(Primary key constraint '%s' not
include all partition fields '%s').",
+ schema.primaryKeys(), schema.partitionKeys()));
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index db37284be..b5ee8c639 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -131,7 +131,7 @@ public class SchemaManagerTest {
"f4"),
""))))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Nonexistent sequence fields: '[f4]'");
+ .hasMessageContaining("Sequence field: 'f4' can not be found
in table schema.");
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
index 435c3263b..67010e586 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaUtils.java
@@ -18,7 +18,6 @@
package org.apache.paimon.schema;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -69,12 +68,6 @@ public class SchemaUtils {
id = 0;
}
- String sequenceField =
options.get(CoreOptions.SEQUENCE_FIELD.key());
- Preconditions.checkArgument(
- sequenceField == null ||
rowType.getFieldNames().contains(sequenceField),
- "Nonexistent sequence field: '%s'",
- sequenceField);
-
TableSchema newSchema =
new TableSchema(
id,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
index d735ea819..a39ce083b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/TableSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.schema;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -30,7 +31,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
import static org.apache.paimon.schema.SchemaValidation.validateTableSchema;
import static org.assertj.core.api.Assertions.assertThat;
@@ -118,6 +122,45 @@ public class TableSchemaTest {
.hasMessage("Broken schema, field id 0 is duplicated.");
}
+ @Test
+ public void testSequenceField() {
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.INT()),
+ new DataField(3, "f3", DataTypes.INT()));
+ List<String> partitionKeys = Collections.singletonList("f0");
+ List<String> primaryKeys = Collections.singletonList("f1");
+ Map<String, String> options = new HashMap<>();
+
+ TableSchema schema =
+ new TableSchema(1, fields, 10, partitionKeys, primaryKeys,
options, "");
+
+ options.put(SEQUENCE_FIELD.key(), "f3");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining(
+ "You can not use sequence.field in cross partition
update case (Primary key constraint '[f1]' not include all partition fields
'[f0]').");
+
+ options.put(SEQUENCE_FIELD.key(), "f4");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("Sequence field: 'f4' can not be found
in table schema.");
+
+ options.put(SEQUENCE_FIELD.key(), "f2,f3,f3");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("Sequence field 'f3' is defined
repeatedly.");
+
+ options.put(SEQUENCE_FIELD.key(), "f3");
+ options.put(MERGE_ENGINE.key(),
CoreOptions.MergeEngine.FIRST_ROW.toString());
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining(
+ "Do not support use sequence field on FIRST_MERGE
merge engine.");
+
+ options.put(FIELDS_PREFIX + ".f3." + AGG_FUNCTION, "max");
+ assertThatThrownBy(() -> validateTableSchema(schema))
+ .hasMessageContaining("Should not define aggregation on
sequence field: 'f3'.");
+ }
+
static RowType newRowType(boolean isNullable, int fieldId) {
return new RowType(
isNullable,