This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0160783c1 [core] Add validation for sequence group during create table
(#2204)
0160783c1 is described below
commit 0160783c1bb3db53b1252c551cd1419b129fb662
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 31 10:15:33 2023 +0800
[core] Add validation for sequence group during create table (#2204)
---
.../org/apache/paimon/schema/SchemaValidation.java | 41 ++++++++++++++++++++++
.../apache/paimon/flink/PartialUpdateITCase.java | 39 ++++++++++++++++++++
2 files changed, 80 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index ed3f28e57..5a34d1187 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -37,13 +37,16 @@ import org.apache.paimon.types.VarCharType;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
@@ -54,6 +57,7 @@ import static
org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
+import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -81,6 +85,8 @@ public class SchemaValidation {
validateStartupMode(options);
+ validateSequenceGroup(schema, options);
+
ChangelogProducer changelogProducer = options.changelogProducer();
if (schema.primaryKeys().isEmpty() && changelogProducer !=
ChangelogProducer.NONE) {
throw new UnsupportedOperationException(
@@ -308,6 +314,41 @@ public class SchemaValidation {
return
configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
}
+ private static void validateSequenceGroup(TableSchema schema, CoreOptions
options) {
+ Map<String, Set<String>> fields2Group = new HashMap<>();
+ for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
+ String k = entry.getKey();
+ String v = entry.getValue();
+ List<String> fieldNames = schema.fieldNames();
+ if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
+ String sequenceFieldName =
+ k.substring(
+ FIELDS_PREFIX.length() + 1,
+ k.length() - SEQUENCE_GROUP.length() - 1);
+ if (!fieldNames.contains(sequenceFieldName)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The sequence field group: %s can not be
found in table schema.",
+ sequenceFieldName));
+ }
+
+ for (String field : v.split(",")) {
+ if (!fieldNames.contains(field)) {
+ throw new IllegalArgumentException(
+ String.format("Field %s can not be found in
table schema.", field));
+ }
+ Set<String> group = fields2Group.computeIfAbsent(field, p
-> new HashSet<>());
+ if (group.add(k) && group.size() > 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field %s is defined repeatedly by
multiple groups: %s.",
+ field, group));
+ }
+ }
+ }
+ }
+ }
+
private static void validateDefaultValues(TableSchema schema) {
CoreOptions coreOptions = new CoreOptions(schema.options());
Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 887183518..96796ff07 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
@@ -201,4 +202,42 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
assertThat(sql("SELECT a, b FROM
SG")).containsExactlyInAnyOrder(Row.of(4, 4));
assertThat(sql("SELECT c, d FROM
SG")).containsExactlyInAnyOrder(Row.of(5, null));
}
+
+ @Test
+ public void testInvalidSequenceGroup() {
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE SG ("
+ + "k INT, a INT, b INT, g_1
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ("
+ +
"'merge-engine'='partial-update', "
+ +
"'fields.g_0.sequence-group'='a,b', "
+ +
"'fields.g_2.sequence-group'='c,d');"))
+ .hasRootCauseMessage(
+ "The sequence field group: g_0 can not be found in
table schema.");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE SG ("
+ + "k INT, a INT, b INT, g_1
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ("
+ +
"'merge-engine'='partial-update', "
+ +
"'fields.g_1.sequence-group'='a1,b', "
+ +
"'fields.g_2.sequence-group'='c,d');"))
+ .hasRootCauseMessage("Field a1 can not be found in table
schema.");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE SG ("
+ + "k INT, a INT, b INT, g_1
INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ("
+ +
"'merge-engine'='partial-update', "
+ +
"'fields.g_1.sequence-group'='a,b', "
+ +
"'fields.g_2.sequence-group'='a,d');"))
+ .hasRootCauseMessage(
+ "Field a is defined repeatedly by multiple groups:
[fields.g_1.sequence-group, fields.g_2.sequence-group].");
+ }
}